| | |
| | | } |
| | | // ensure that all initial replicas alive information have been updated |
| | | // with CSNs that are acceptable for moving the medium consistency forward |
| | | return allInitialReplicasArePastOldestPossibleCSN(); |
| | | return allInitialReplicasAreOfflineOrAlive(); |
| | | } |
| | | |
| | | private boolean allInitialReplicasArePastOldestPossibleCSN() |
| | | /** |
| | | * Returns true only if the initial replicas known from the changelog state DB |
| | | * are either: |
| | | * <ul> |
| | | * <li>offline, so do not wait for them in order to compute medium consistency |
| | | * </li> |
| | | * <li>alive, because we received heartbeats or changes (so their last alive |
| | | * CSN has been updated to something past the oldest possible CSN), we have |
| | | * enough info to compute medium consistency</li> |
| | | * </ul> |
| | | * In this case, we have enough information to compute medium consistency |
| | | * without waiting any more. |
| | | */ |
| | | private boolean allInitialReplicasAreOfflineOrAlive() |
| | | { |
| | | for (DN baseDN : lastAliveCSNs) |
| | | { |
| | | for (CSN csn : lastAliveCSNs.getServerState(baseDN)) |
| | | { |
| | | if (csn.getTime() == 0) |
| | | if (// oldest possible CSN? |
| | | csn.getTime() == 0 |
| | | // replica is not offline |
| | | && replicasOffline.getCSN(baseDN, csn.getServerId()) == null) |
| | | { |
| | | return false; |
| | | } |
| | |
| | | if (isECLEnabledDomain(baseDN)) |
| | | { |
| | | replicasOffline.update(baseDN, offlineCSN); |
| | | // a replica offline message could also be the very last time |
| | | // we heard from this replica :) |
| | | lastAliveCSNs.update(baseDN, offlineCSN); |
| | | } |
| | | } |
| | | } |
| | |
| | | new ChangeNumberIndexRecord(previousCookie, baseDN, csn); |
| | | changelogDB.getChangeNumberIndexDB().addRecord(record); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | catch (InterruptedException ignored) |
| | | { |
| | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | boolean callNextOnCursor = true; |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | mediumConsistency = Pair.of(mcBaseDN, mcCSN); |
| | |
| | | } |
| | | else if (offlineCSN.isOlderThan(mcCSN)) |
| | | { |
| | | /* |
| | | * replica is not back online and Medium consistency point has gone past |
| | | * its last offline time: remove everything known about it: cursor, |
| | | * offlineCSN from lastAliveCSN and remove all knowledge of this replica |
| | | * from the medium consistency RUV. |
| | | */ |
| | | removeCursor(mcBaseDN, mcCSN); |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | pair = getCursor(mcBaseDN, mcCSN.getServerId()); |
| | | Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond(); |
| | | if (iter != null && !iter.hasNext()) |
| | | { |
| | | /* |
| | | * replica is not back online, Medium consistency point has gone past |
| | | * its last offline time, and there are no more changes after the |
| | | * offline CSN in the cursor: remove everything known about it: |
| | | * cursor, offlineCSN from lastAliveCSN and remove all knowledge of |
| | | * this replica from the medium consistency RUV. |
| | | */ |
| | | iter.remove(); |
| | | StaticUtils.close(pair.getFirst()); |
| | | resetNextChangeForInsertDBCursor(); |
| | | callNextOnCursor = false; |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (callNextOnCursor) |
| | | { |
| | | // advance the cursor we just read from, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | } |
| | | |
| | | private void removeAllCursors() |
| | |
| | | newCursors.clear(); |
| | | } |
| | | |
| | | private void removeCursor(final DN baseDN, final CSN csn) |
| | | throws ChangelogException |
| | | private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>> |
| | | getCursor(final DN baseDN, final int serverId) throws ChangelogException |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1 |
| | | : allCursors.entrySet()) |
| | |
| | | entry1.getValue().entrySet().iterator(); iter.hasNext();) |
| | | { |
| | | final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next(); |
| | | if (csn.getServerId() == entry2.getKey()) |
| | | if (serverId == entry2.getKey()) |
| | | { |
| | | iter.remove(); |
| | | StaticUtils.close(entry2.getValue()); |
| | | resetNextChangeForInsertDBCursor(); |
| | | return; |
| | | return Pair.of(entry2.getValue(), iter); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return Pair.empty(); |
| | | } |
| | | |
| | | private boolean recycleExhaustedCursors() throws ChangelogException |