| | |
| | | /* |
| | | * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. This solution also avoids |
| | | * using a queue that could fill up before we have consumed all its content. |
| | | * updates 2) many updates can happen concurrently. |
| | | */ |
| | | /** |
| | | * Holds the cross domain medium consistency Replication Update Vector for the |
| | |
| | | */ |
| | | private final MultiDomainServerState lastSeenUpdates = |
| | | new MultiDomainServerState(); |
| | | private final MultiDomainServerState replicasOffline = |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Composite cursor across all the replicaDBs for all the replication domains. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Signals a replica went offline. |
| | | * |
| | | * @param baseDN |
| | | * the replica's replication domain |
| | | * @param offlineCSN |
| | | * the serverId and time of the replica that went offline |
| | | */ |
| | | public void replicaOffline(DN baseDN, CSN offlineCSN) |
| | | { |
| | | lastSeenUpdates.update(baseDN, offlineCSN); |
| | | replicasOffline.update(baseDN, offlineCSN); |
| | | tryNotify(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Notifies the Change number indexer thread if it will be able to do some |
| | | * work. |
| | | */ |
| | |
| | | final CSN mcCSN = mediumConsistencyCSN; |
| | | if (mcCSN != null) |
| | | { |
| | | final CSN lastSeenSameServerId = |
| | | lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId()); |
| | | final int serverId = mcCSN.getServerId(); |
| | | final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId); |
| | | return mcCSN.isOlderThan(lastSeenSameServerId); |
| | | } |
| | | return true; |
| | |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(baseDN, csn); |
| | | mediumConsistencyCSN = csn; |
| | | final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId()); |
| | | if (offlineCSN != null |
| | | && offlineCSN.isOlderThan(mediumConsistencyCSN) |
| | | // If no new updates has been seen for this replica |
| | | && lastSeenUpdates.removeCSN(baseDN, offlineCSN)) |
| | | { |
| | | removeCursor(baseDN, csn); |
| | | replicasOffline.removeCSN(baseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(baseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | | private void removeCursor(final DN baseDN, final CSN csn) |
| | | { |
| | | for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors |
| | | .entrySet()) |
| | | { |
| | | if (baseDN.equals(entry.getKey())) |
| | | { |
| | | final Set<Integer> serverIds = entry.getValue().keySet(); |
| | | for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();) |
| | | { |
| | | final int serverId = iter.next(); |
| | | if (csn.getServerId() == serverId) |
| | | { |
| | | iter.remove(); |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void createNewCursors() throws ChangelogException |