| | |
| | | private final DN baseDN; |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = |
| | | new ConcurrentSkipListMap<Integer, CSN>(); |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = new ConcurrentSkipListMap<Integer, CSN>(); |
| | | /** |
| | | * Replaces null CSNs in ConcurrentSkipListMap that does not support null values. |
| | | */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | private final KeyMatchingStrategy matchingStrategy; |
| | | |
| | | /** |
| | | * Builds a DomainDBCursor instance. |
| | |
| | | * the replication domain baseDN of this cursor |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy, which allow to indicates how key is |
| | | * matched |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * Cursor position strategy, which allow to indicates at which exact |
| | | * position the cursor must start |
| | | */ |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy) |
| | | public DomainDBCursor(final DN baseDN, final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.domainDB = domainDB; |
| | | this.matchingStrategy = matchingStrategy; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | |
| | | * |
| | | * @param serverId |
| | | * the serverId of the replica |
| | | * @param startAfterCSN |
| | | * the CSN after which to start iterating |
| | | * @param startCSN |
| | | * the CSN to use as a starting point |
| | | */ |
| | | public void addReplicaDB(int serverId, CSN startAfterCSN) |
| | | public void addReplicaDB(int serverId, CSN startCSN) |
| | | { |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN); |
| | | newReplicas.putIfAbsent(serverId, startCSN != null ? startCSN : NULL_CSN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | final CSN csn = pair.getValue(); |
| | | final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = |
| | | domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy); |
| | | domainDB.getCursorFrom(baseDN, serverId, startCSN, matchingStrategy, positionStrategy); |
| | | addCursor(cursor, null); |
| | | iter.remove(); |
| | | } |