| | |
| | | private final MultiDomainServerState mediumConsistencyRUV = |
| | | new MultiDomainServerState(); |
| | | /** |
| | | * Holds the cross domain medium consistency CSN for the current replication |
| | | * server. |
| | | * Holds the cross domain medium consistency baseDN and CSN for the current |
| | | * replication server. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | | * >OpenDJ Domain Names - medium consistency CSN</a> |
| | | */ |
| | | private volatile CSN mediumConsistencyCSN; |
| | | private volatile Pair<DN, CSN> mediumConsistency; |
| | | |
| | | /** |
| | | * Holds the last time each replica was seen alive, whether via updates or |
| | |
| | | } |
| | | |
| | | lastAliveCSNs.update(baseDN, heartbeatCSN); |
| | | tryNotify(baseDN); |
| | | tryNotify(); |
| | | } |
| | | |
| | | /** |
| | |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); |
| | | lastAliveCSNs.update(baseDN, csn); |
| | | tryNotify(baseDN); |
| | | tryNotify(); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | replicasOffline.update(baseDN, offlineCSN); |
| | | lastAliveCSNs.update(baseDN, offlineCSN); |
| | | tryNotify(baseDN); |
| | | tryNotify(); |
| | | } |
| | | |
| | | /** |
| | | * Notifies the Change number indexer thread if it will be able to do some |
| | | * work. |
| | | */ |
| | | private void tryNotify(DN baseDN) |
| | | private void tryNotify() |
| | | { |
| | | if (canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | if (canMoveForwardMediumConsistencyPoint()) |
| | | { |
| | | synchronized (this) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean canMoveForwardMediumConsistencyPoint(DN baseDN) |
| | | private boolean canMoveForwardMediumConsistencyPoint() |
| | | { |
| | | final CSN mcCSN = mediumConsistencyCSN; |
| | | if (mcCSN != null) |
| | | final Pair<DN, CSN> mc = mediumConsistency; |
| | | if (mc != null) |
| | | { |
| | | final int serverId = mcCSN.getServerId(); |
| | | CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId); |
| | | final CSN mcCSN = mc.getSecond(); |
| | | final CSN lastTimeSameReplicaSeenAlive = |
| | | lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId()); |
| | | return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive); |
| | | } |
| | | return true; |
| | |
| | | } |
| | | wait(); |
| | | } |
| | | // advance cursor, success/failure will be checked later |
| | | // try to recycle the exhausted cursors, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | // loop to check whether new changes have been added to the |
| | | // ReplicaDBs |
| | |
| | | final DN baseDN = nextChangeForInsertDBCursor.getData(); |
| | | // FIXME problem: what if the serverId is not part of the ServerState? |
| | | // right now, change number will be blocked |
| | | if (!canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | if (!canMoveForwardMediumConsistencyPoint()) |
| | | { |
| | | // the oldest record to insert is newer than the medium consistency |
| | | // point. Let's wait for a change that can be published. |
| | |
| | | { |
| | | // double check to protect against a missed call to notify() |
| | | if (!isShutdownInitiated() |
| | | && !canMoveForwardMediumConsistencyPoint(baseDN)) |
| | | && !canMoveForwardMediumConsistencyPoint()) |
| | | { |
| | | wait(); |
| | | // loop to check if changes older than the medium consistency |
| | |
| | | changelogDB.getChangeNumberIndexDB().addRecord(record); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | |
| | | // advance cursor, success/failure will be checked later |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | catch (InterruptedException ignored) |
| | |
| | | } |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN) |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(baseDN, csn); |
| | | mediumConsistencyCSN = csn; |
| | | final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId()); |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | mediumConsistency = Pair.of(mcBaseDN, mcCSN); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId()); |
| | | if (offlineCSN != null |
| | | && offlineCSN.isOlderThan(mediumConsistencyCSN) |
| | | && offlineCSN.isOlderThan(mcCSN) |
| | | // If no new updates has been seen for this replica |
| | | && lastAliveCSNs.removeCSN(baseDN, offlineCSN)) |
| | | && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN)) |
| | | { |
| | | removeCursor(baseDN, csn); |
| | | replicasOffline.removeCSN(baseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(baseDN, offlineCSN); |
| | | removeCursor(mcBaseDN, mcCSN); |
| | | replicasOffline.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | |