| | |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | |
| | | private ChangelogState changelogState; |
| | | |
| | | /* |
| | | * previousCookie and mediumConsistencyPoint must be thread safe, because |
| | | * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. This solution also avoids |
| | | * using a queue that could fill up before we have consumed all its content. |
| | | */ |
| | | /** |
| | | * Holds the cross domain medium consistency Replication Update Vector for the |
| | | * current replication server, also known as the previous cookie. |
| | | * <p> |
| | | * Stores the value of the cookie before the change currently processed is |
| | | * inserted in the DB. After insert, it is updated with the CSN of the change |
| | | * currently processed (thus becoming the "current" cookie just before the |
| | | * change is returned. |
| | | */ |
| | | private final MultiDomainServerState previousCookie = |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Holds the medium consistency point for the current replication server. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | | * >OpenDJ Domain Names for a description of what the medium consistency point |
| | | * is</a> |
| | | * >OpenDJ Domain Names - medium consistency RUV</a> |
| | | */ |
| | | private final MultiDomainServerState mediumConsistencyPoint = |
| | | private final MultiDomainServerState mediumConsistencyRUV = |
| | | new MultiDomainServerState(); |
| | | /** |
| | | * Holds the cross domain medium consistency CSN for the current replication |
| | | * server. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | | * >OpenDJ Domain Names - medium consistency CSN</a> |
| | | */ |
| | | private volatile CSN mediumConsistencyCSN; |
| | | |
| | | /** |
| | | * Holds the most recent changes or heartbeats received for each serverIds |
| | | * cross domain. |
| | | */ |
| | | private final MultiDomainServerState lastSeenUpdates = |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | |
| | | private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors = |
| | | new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>(); |
| | | /** This map can be updated by multiple threads. */ |
| | | private ConcurrentMap<Integer, DN> newCursors = |
| | | new ConcurrentSkipListMap<Integer, DN>(); |
| | | private ConcurrentMap<CSN, DN> newCursors = |
| | | new ConcurrentSkipListMap<CSN, DN>(); |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | */ |
| | | public void publishHeartbeat(DN baseDN, CSN heartbeatCSN) |
| | | { |
| | | mediumConsistencyPoint.update(baseDN, heartbeatCSN); |
| | | final CompositeDBCursor<DN> localCursor = crossDomainDBCursor; |
| | | final DN changeBaseDN = localCursor.getData(); |
| | | final CSN changeCSN = localCursor.getRecord().getCSN(); |
| | | tryNotify(changeBaseDN, changeCSN); |
| | | lastSeenUpdates.update(baseDN, heartbeatCSN); |
| | | tryNotify(baseDN); |
| | | } |
| | | |
| | | /** |
| | |
| | | throws ChangelogException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | mediumConsistencyPoint.update(baseDN, csn); |
| | | newCursors.put(csn.getServerId(), baseDN); |
| | | tryNotify(baseDN, csn); |
| | | lastSeenUpdates.update(baseDN, csn); |
| | | newCursors.put(csn, baseDN); |
| | | tryNotify(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Notifies the Change number indexer thread if it will be able to do some |
| | | * work. |
| | | */ |
| | | private void tryNotify(final DN baseDN, final CSN csn) |
| | | private void tryNotify(DN baseDN) |
| | | { |
| | | if (mediumConsistencyPoint.cover(baseDN, csn)) |
| | | if (canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | { |
| | | synchronized (this) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean canMoveForwardMediumConsistencyPoint(DN baseDN) |
| | | { |
| | | final CSN mcCSN = mediumConsistencyCSN; |
| | | if (mcCSN != null) |
| | | { |
| | | final CSN lastSeenSameServerId = |
| | | lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId()); |
| | | return mcCSN.isOlderThan(lastSeenSameServerId); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | private void initialize() throws ChangelogException, DirectoryException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = |
| | | changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | if (newestRecord != null) |
| | | { |
| | | previousCookie.update( |
| | | mediumConsistencyRUV.update( |
| | | new MultiDomainServerState(newestRecord.getPreviousCookie())); |
| | | } |
| | | |
| | |
| | | final DN baseDN = entry.getKey(); |
| | | for (Integer serverId : entry.getValue()) |
| | | { |
| | | final ServerState previousSS = previousCookie.get(baseDN); |
| | | final CSN csn = previousSS != null ? previousSS.getCSN(serverId) : null; |
| | | final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId); |
| | | ensureCursorExists(baseDN, serverId, csn); |
| | | } |
| | | |
| | | ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN); |
| | | mediumConsistencyPoint.update(baseDN, latestKnownState); |
| | | lastSeenUpdates.update(baseDN, latestKnownState); |
| | | } |
| | | |
| | | crossDomainDBCursor = newCompositeDBCursor(); |
| | |
| | | final UpdateMsg record = crossDomainDBCursor.getRecord(); |
| | | if (!record.getCSN().equals(newestRecord.getCSN())) |
| | | { |
| | | // TODO JNR remove |
| | | throw new RuntimeException("They do not equal! recordCSN=" |
| | | + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()); |
| | | // TODO JNR i18n safety check, should never happen |
| | | throw new ChangelogException(Message.raw("They do not equal! recordCSN=" |
| | | + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN())); |
| | | } |
| | | // TODO JNR is it possible to use the following line instead? |
| | | // previousCookie.update(newestRecord.getBaseDN(), record.getCSN()); |
| | | // TODO JNR would this mean updating the if above? |
| | | previousCookie.update(crossDomainDBCursor.getData(), record.getCSN()); |
| | | mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); |
| | | crossDomainDBCursor.next(); |
| | | } |
| | | |
| | |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // TODO Auto-generated catch block |
| | | // TODO JNR error message i18n |
| | | if (debugEnabled()) |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | return; |
| | |
| | | final DN baseDN = crossDomainDBCursor.getData(); |
| | | // FIXME problem: what if the serverId is not part of the ServerState? |
| | | // right now, thread will be blocked |
| | | if (!mediumConsistencyPoint.cover(baseDN, csn)) |
| | | if (!canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | { |
| | | // the oldest record to insert is newer than the medium consistency |
| | | // point. Let's wait for a change that can be published. |
| | | synchronized (this) |
| | | { |
| | | // double check to protect against a missed call to notify() |
| | | if (!mediumConsistencyPoint.cover(baseDN, csn)) |
| | | if (!canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | { |
| | | wait(); |
| | | // loop to check if changes older than the medium consistency |
| | |
| | | |
| | | // OK, the oldest change is older than the medium consistency point |
| | | // let's publish it to the CNIndexDB |
| | | final String previousCookie = mediumConsistencyRUV.toString(); |
| | | final ChangeNumberIndexRecord record = |
| | | new ChangeNumberIndexRecord(previousCookie.toString(), baseDN, csn); |
| | | new ChangeNumberIndexRecord(previousCookie, baseDN, csn); |
| | | changelogDB.getChangeNumberIndexDB().addRecord(record); |
| | | // update, so it becomes the previous cookie for the next change |
| | | previousCookie.update(baseDN, csn); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | |
| | | // advance cursor, success/failure will be checked later |
| | | crossDomainDBCursor.next(); |
| | |
| | | } |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN) |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(baseDN, csn); |
| | | mediumConsistencyCSN = csn; |
| | | } |
| | | |
| | | private void createNewCursors() throws ChangelogException |
| | | { |
| | | if (!newCursors.isEmpty()) |
| | | { |
| | | boolean newCursorAdded = false; |
| | | for (Iterator<Entry<Integer, DN>> iter = newCursors.entrySet().iterator(); |
| | | for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator(); |
| | | iter.hasNext();) |
| | | { |
| | | final Entry<Integer, DN> entry = iter.next(); |
| | | if (!ensureCursorExists(entry.getValue(), entry.getKey(), null)) |
| | | final Entry<CSN, DN> entry = iter.next(); |
| | | final CSN csn = entry.getKey(); |
| | | if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null)) |
| | | { |
| | | newCursorAdded = true; |
| | | } |