| | |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * If this is true, then the {@link #run()} method must clear its state. |
| | | * Otherwise the run method executes normally. |
| | | */ |
| | | private final AtomicBoolean doClear = new AtomicBoolean(); |
| | | private final ChangelogDB changelogDB; |
| | | /** Only used for initialization, and then discarded. */ |
| | | private ChangelogState changelogState; |
| | |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Composite cursor across all the replicaDBs for all the replication domains. |
| | | * It is volatile to ensure it supports concurrent update. Each time it is |
| | | * used more than once in a method, the method must take a local copy to |
| | | * ensure the cursor does not get updated in the middle of the method. |
| | | * Cursor across all the replicaDBs for all the replication domains. It is |
| | | * positioned on the next change that needs to be inserted in the CNIndexDB. |
| | | * <p> |
| | | * Note: it is only accessed from the {@link #run()} method. |
| | | */ |
| | | private volatile CompositeDBCursor<DN> crossDomainDBCursor; |
| | | private CompositeDBCursor<DN> nextChangeForInsertDBCursor; |
| | | |
| | | /** |
| | | * New cursors for this Map must be created from the {@link #run()} method, |
| | |
| | | */ |
| | | private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors = |
| | | new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>(); |
| | | /** This map can be updated by multiple threads. */ |
| | | private ConcurrentMap<CSN, DN> newCursors = |
| | | new ConcurrentSkipListMap<CSN, DN>(); |
| | | /** |
| | | * Holds the newCursors that will have to be created in the next iteration |
| | | * inside the {@link #run()} method. |
| | | * <p> |
| | | * This map can be updated by multiple threads. |
| | | */ |
| | | private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>( |
| | | new Comparator<Pair<DN, Integer>>() |
| | | { |
| | | @Override |
| | | public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2) |
| | | { |
| | | final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst()); |
| | | if (compareBaseDN == 0) |
| | | { |
| | | return o1.getSecond().compareTo(o2.getSecond()); |
| | | } |
| | | return compareBaseDN; |
| | | } |
| | | }); |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | lastSeenUpdates.update(baseDN, csn); |
| | | newCursors.put(csn, baseDN); |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); |
| | | tryNotify(baseDN); |
| | | } |
| | | |
| | |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Restores in memory data needed to build the CNIndexDB, including the medium |
| | | * consistency point. |
| | | */ |
| | | private void initialize() throws ChangelogException, DirectoryException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = |
| | | changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the mediumConsistencyRUV from DB |
| | | mediumConsistencyRUV.update( |
| | | new MultiDomainServerState(newestRecord.getPreviousCookie())); |
| | | } |
| | | |
| | | // initialize the cross domain DB cursor |
| | | // initialize the DB cursor and the last seen updates |
| | | // to ensure the medium consistency CSN can move forward |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | for (Entry<DN, List<Integer>> entry |
| | | : changelogState.getDomainToServerIds().entrySet()) |
| | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | lastSeenUpdates.update(baseDN, latestKnownState); |
| | | } |
| | | resetNextChangeForInsertDBCursor(); |
| | | |
| | | crossDomainDBCursor = newCompositeDBCursor(); |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the "previousCookie" state before shutdown |
| | | final UpdateMsg record = crossDomainDBCursor.getRecord(); |
| | | final UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); |
| | | if (!record.getCSN().equals(newestRecord.getCSN())) |
| | | { |
| | | // TODO JNR i18n safety check, should never happen |
| | |
| | | + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN())); |
| | | } |
| | | mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); |
| | | crossDomainDBCursor.next(); |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | // this will not be used any more. Discard for garbage collection. |
| | | this.changelogState = null; |
| | | } |
| | | |
| | | private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException |
| | | private void resetNextChangeForInsertDBCursor() throws ChangelogException |
| | | { |
| | | final Map<DBCursor<UpdateMsg>, DN> cursors = |
| | | new HashMap<DBCursor<UpdateMsg>, DN>(); |
| | |
| | | } |
| | | final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors); |
| | | result.next(); |
| | | return result; |
| | | nextChangeForInsertDBCursor = result; |
| | | } |
| | | |
| | | private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn) |
| | |
| | | if (cursor == null) |
| | | { |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, csn); |
| | | // use an older CSN because getCursorFrom() starts after the given CSN |
| | | final CSN anOlderCSN = getPrecedingCSN(csn); |
| | | cursor = domainDB.getCursorFrom(baseDN, serverId, anOlderCSN); |
| | | map.put(serverId, cursor); |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * Returns the immediately preceding CSN. |
| | | */ |
| | | private CSN getPrecedingCSN(CSN csn) |
| | | { |
| | | if (csn.getSeqnum() > 0) |
| | | { |
| | | return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId()); |
| | | } |
| | | return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void run() |
| | |
| | | * used. |
| | | */ |
| | | initialize(); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | // TODO JNR error message i18n |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | return; |
| | | |
| | | while (!isShutdownInitiated()) |
| | | { |
| | | try |
| | | { |
| | | if (doClear.get()) |
| | | { |
| | | removeAllCursors(); |
| | | // No need to use CAS here because it is only for unit tests and at |
| | | // this point all will have been cleaned up anyway. |
| | | doClear.set(false); |
| | | } |
| | | else |
| | | { |
| | | createNewCursors(); |
| | | } |
| | | |
| | | final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); |
| | | if (msg == null) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | wait(); |
| | | } |
| | | // advance cursor, success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | // loop to check whether new changes have been added to the |
| | | // ReplicaDBs |
| | | continue; |
| | | } |
| | | |
| | | final CSN csn = msg.getCSN(); |
| | | final DN baseDN = nextChangeForInsertDBCursor.getData(); |
| | | // FIXME problem: what if the serverId is not part of the ServerState? |
| | | // right now, thread will be blocked |
| | | if (!canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | { |
| | | // the oldest record to insert is newer than the medium consistency |
| | | // point. Let's wait for a change that can be published. |
| | | synchronized (this) |
| | | { |
| | | // double check to protect against a missed call to notify() |
| | | if (!canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | { |
| | | wait(); |
| | | // loop to check if changes older than the medium consistency |
| | | // point have been added to the ReplicaDBs |
| | | continue; |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | // OK, the oldest change is older than the medium consistency point |
| | | // let's publish it to the CNIndexDB. |
| | | |
| | | // Next if statement is ugly but ensures the first change will not be |
| | | // immediately trimmed from the CNIndexDB. Yuck! |
| | | if (mediumConsistencyRUV.isEmpty()) |
| | | { |
| | | mediumConsistencyRUV.replace(baseDN, new ServerState()); |
| | | } |
| | | final String previousCookie = mediumConsistencyRUV.toString(); |
| | | final ChangeNumberIndexRecord record = |
| | | new ChangeNumberIndexRecord(previousCookie, baseDN, csn); |
| | | changelogDB.getChangeNumberIndexDB().addRecord(record); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | |
| | | // advance cursor, success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | catch (InterruptedException ignored) |
| | | { |
| | | // was shutdown called? loop to figure it out. |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | } |
| | | removeAllCursors(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // TODO JNR error message i18n |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | return; |
| | | // TODO JNR error message i18n |
| | | } |
| | | |
| | | while (!isShutdownInitiated()) |
| | | catch (DirectoryException e) |
| | | { |
| | | try |
| | | { |
| | | createNewCursors(); |
| | | |
| | | final UpdateMsg msg = crossDomainDBCursor.getRecord(); |
| | | if (msg == null) |
| | | { |
| | | synchronized (this) |
| | | { |
| | | wait(); |
| | | } |
| | | // advance cursor, success/failure will be checked later |
| | | crossDomainDBCursor.next(); |
| | | // loop to check whether new changes have been added to the ReplicaDBs |
| | | continue; |
| | | } |
| | | |
| | | final CSN csn = msg.getCSN(); |
| | | final DN baseDN = crossDomainDBCursor.getData(); |
| | | // FIXME problem: what if the serverId is not part of the ServerState? |
| | | // right now, thread will be blocked |
| | | if (!canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | { |
| | | // the oldest record to insert is newer than the medium consistency |
| | | // point. Let's wait for a change that can be published. |
| | | synchronized (this) |
| | | { |
| | | // double check to protect against a missed call to notify() |
| | | if (!canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | { |
| | | wait(); |
| | | // loop to check if changes older than the medium consistency |
| | | // point have been added to the ReplicaDBs |
| | | continue; |
| | | } |
| | | } |
| | | } |
| | | |
| | | // OK, the oldest change is older than the medium consistency point |
| | | // let's publish it to the CNIndexDB |
| | | final String previousCookie = mediumConsistencyRUV.toString(); |
| | | final ChangeNumberIndexRecord record = |
| | | new ChangeNumberIndexRecord(previousCookie, baseDN, csn); |
| | | changelogDB.getChangeNumberIndexDB().addRecord(record); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | |
| | | // advance cursor, success/failure will be checked later |
| | | crossDomainDBCursor.next(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | // TODO JNR error message i18n |
| | | } |
| | | catch (InterruptedException ignored) |
| | | { |
| | | // was shutdown called? |
| | | } |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | // TODO JNR error message i18n |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private void removeAllCursors() throws ChangelogException |
| | | { |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | StaticUtils.close(map.values()); |
| | | } |
| | | allCursors.clear(); |
| | | newCursors.clear(); |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | |
| | | private void removeCursor(final DN baseDN, final CSN csn) |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors |
| | | .entrySet()) |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 |
| | | : allCursors.entrySet()) |
| | | { |
| | | if (baseDN.equals(entry.getKey())) |
| | | if (baseDN.equals(entry1.getKey())) |
| | | { |
| | | final Set<Integer> serverIds = entry.getValue().keySet(); |
| | | for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();) |
| | | for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = |
| | | entry1.getValue().entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final int serverId = iter.next(); |
| | | if (csn.getServerId() == serverId) |
| | | final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next(); |
| | | if (csn.getServerId() == entry2.getKey()) |
| | | { |
| | | iter.remove(); |
| | | StaticUtils.close(entry2.getValue()); |
| | | return; |
| | | } |
| | | } |
| | |
| | | if (!newCursors.isEmpty()) |
| | | { |
| | | boolean newCursorAdded = false; |
| | | for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator(); |
| | | iter.hasNext();) |
| | | for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter = |
| | | newCursors.entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<CSN, DN> entry = iter.next(); |
| | | final CSN csn = entry.getKey(); |
| | | if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null)) |
| | | final Entry<Pair<DN, Integer>, CSN> entry = iter.next(); |
| | | final DN baseDN = entry.getKey().getFirst(); |
| | | final CSN csn = entry.getValue(); |
| | | if (!ensureCursorExists(baseDN, csn.getServerId(), csn)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |
| | |
| | | } |
| | | if (newCursorAdded) |
| | | { |
| | | crossDomainDBCursor = newCompositeDBCursor(); |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Asks the current thread to clear its state. |
| | | * <p> |
| | | * This method is only useful for unit tests. |
| | | */ |
| | | public void clear() |
| | | { |
| | | doClear.set(true); |
| | | synchronized (this) |
| | | { |
| | | notify(); |
| | | } |
| | | while (doClear.get()) |
| | | { |
| | | // wait until clear() has been done by thread |
| | | // ensures unit tests wait that this thread's state is cleaned up |
| | | Thread.yield(); |
| | | } |
| | | } |
| | | |
| | | } |