| | |
| | | * inserted in the DB. After insert, it is updated with the CSN of the change |
| | | * currently processed (thus becoming the "current" cookie just before the |
| | | * change is returned. |
| | | * <p> |
| | | * Note: This object is only updated by changes/updates. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | |
| | | */ |
| | | private final MultiDomainServerState mediumConsistencyRUV = |
| | | new MultiDomainServerState(); |
| | | /** |
| | | * Holds the cross domain medium consistency baseDN and CSN for the current |
| | | * replication server. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | | * >OpenDJ Domain Names - medium consistency CSN</a> |
| | | */ |
| | | private volatile Pair<DN, CSN> mediumConsistency; |
| | | |
| | | /** |
| | | * Holds the last time each replica was seen alive, whether via updates or |
| | |
| | | * <p> |
| | | * Updates are persistent and stored in the replicaDBs, heartbeats are |
| | | * transient and are easily constructed on normal operations. |
| | | * <p> |
| | | * Note: This object is updated by both heartbeats and changes/updates. |
| | | */ |
| | | private final MultiDomainServerState lastAliveCSNs = |
| | | new MultiDomainServerState(); |
| | | /** Note: This object is updated by replica offline messages. */ |
| | | private final MultiDomainServerState replicasOffline = |
| | | new MultiDomainServerState(); |
| | | |
| | |
| | | * positioned on the next change that needs to be inserted in the CNIndexDB. |
| | | * <p> |
| | | * Note: it is only accessed from the {@link #run()} method. |
| | | * |
| | | * @NonNull |
| | | */ |
| | | private CompositeDBCursor<DN> nextChangeForInsertDBCursor; |
| | | @SuppressWarnings("unchecked") |
| | | private CompositeDBCursor<DN> nextChangeForInsertDBCursor = |
| | | new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false); |
| | | |
| | | /** |
| | | * New cursors for this Map must be created from the {@link #run()} method, |
| | |
| | | return; |
| | | } |
| | | |
| | | final CSN oldestCSNBefore = getOldestLastAliveCSN(); |
| | | lastAliveCSNs.update(baseDN, heartbeatCSN); |
| | | tryNotify(); |
| | | tryNotify(oldestCSNBefore); |
| | | } |
| | | |
| | | /** |
| | |
| | | final CSN csn = updateMsg.getCSN(); |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); |
| | | final CSN oldestCSNBefore = getOldestLastAliveCSN(); |
| | | lastAliveCSNs.update(baseDN, csn); |
| | | tryNotify(); |
| | | tryNotify(oldestCSNBefore); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void replicaOffline(DN baseDN, CSN offlineCSN) |
| | | { |
| | | if (!isECLEnabledDomain(baseDN)) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | replicasOffline.update(baseDN, offlineCSN); |
| | | final CSN oldestCSNBefore = getOldestLastAliveCSN(); |
| | | lastAliveCSNs.update(baseDN, offlineCSN); |
| | | tryNotify(); |
| | | tryNotify(oldestCSNBefore); |
| | | } |
| | | |
| | | private CSN getOldestLastAliveCSN() |
| | | { |
| | | return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond(); |
| | | } |
| | | |
| | | /** |
| | | * Notifies the Change number indexer thread if it will be able to do some |
| | | * work. |
| | | */ |
| | | private void tryNotify() |
| | | private void tryNotify(final CSN oldestCSNBefore) |
| | | { |
| | | if (canMoveForwardMediumConsistencyPoint()) |
| | | if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore)) |
| | | { |
| | | synchronized (this) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean canMoveForwardMediumConsistencyPoint() |
| | | /** |
| | | * Used for waking up the {@link ChangeNumberIndexer} thread because it might |
| | | * have some work to do. |
| | | */ |
| | | private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore) |
| | | { |
| | | final Pair<DN, CSN> mc = mediumConsistency; |
| | | if (mc != null) |
| | | { |
| | | final CSN mcCSN = mc.getSecond(); |
| | | final CSN lastTimeSameReplicaSeenAlive = |
| | | lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId()); |
| | | return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive); |
| | | } |
| | | final CSN oldestCSNAfter = getOldestLastAliveCSN(); |
| | | // ensure that all initial replicas alive information have been updated |
| | | // with CSNs that are acceptable for moving the medium consistency forward |
| | | return allInitialReplicasAreOfflineOrAlive(); |
| | | return allInitialReplicasAreOfflineOrAlive() |
| | | && oldestCSNBefore != null // then oldestCSNAfter cannot be null |
| | | // has the oldest CSN changed? |
| | | && oldestCSNBefore.isOlderThan(oldestCSNAfter); |
| | | } |
| | | |
| | | /** |
| | | * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN |
| | | * must be persisted to the change number index DB. |
| | | */ |
| | | private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist) |
| | | { |
| | | // ensure that all initial replicas alive information have been updated |
| | | // with CSNs that are acceptable for moving the medium consistency forward |
| | | return allInitialReplicasAreOfflineOrAlive() |
| | | // can we persist the next CSN? |
| | | && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN()); |
| | | } |
| | | |
| | | /** |
| | |
| | | * CSN has been updated to something past the oldest possible CSN), we have |
| | | * enough info to compute medium consistency</li> |
| | | * </ul> |
| | | * In this case, we have enough information to compute medium consistency |
| | | * without waiting any more. |
| | | * In both cases, we have enough information to compute medium consistency |
| | | * without waiting any further. |
| | | */ |
| | | private boolean allInitialReplicasAreOfflineOrAlive() |
| | | { |
| | |
| | | { |
| | | for (CSN csn : lastAliveCSNs.getServerState(baseDN)) |
| | | { |
| | | if (// oldest possible CSN? |
| | | csn.getTime() == 0 |
| | | // replica is not offline |
| | | if (csn.getTime() == 0 |
| | | && replicasOffline.getCSN(baseDN, csn.getServerId()) == null) |
| | | { |
| | | // this is the oldest possible CSN, but the replica is not offline |
| | | // we must wait for more up to date information from this replica |
| | | return false; |
| | | } |
| | | } |
| | |
| | | final DN baseDN = nextChangeForInsertDBCursor.getData(); |
| | | // FIXME problem: what if the serverId is not part of the ServerState? |
| | | // right now, change number will be blocked |
| | | if (!canMoveForwardMediumConsistencyPoint()) |
| | | if (!canMoveForwardMediumConsistencyPoint(csn)) |
| | | { |
| | | // the oldest record to insert is newer than the medium consistency |
| | | // point. Let's wait for a change that can be published. |
| | | synchronized (this) |
| | | { |
| | | // double check to protect against a missed call to notify() |
| | | if (!isShutdownInitiated() |
| | | && !canMoveForwardMediumConsistencyPoint()) |
| | | if (!canMoveForwardMediumConsistencyPoint(csn)) |
| | | { |
| | | if (isShutdownInitiated()) |
| | | { |
| | | return; |
| | | } |
| | | wait(); |
| | | // loop to check if changes older than the medium consistency |
| | | // point have been added to the ReplicaDBs |
| | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | boolean callNextOnCursor = true; |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | mediumConsistency = Pair.of(mcBaseDN, mcCSN); |
| | | |
| | | boolean callNextOnCursor = true; |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |