| | |
| | | } |
| | | else |
| | | { |
| | | entrySender = new CookieEntrySender(searchOperation); |
| | | entrySender = new CookieEntrySender(searchOperation, SearchPhase.INITIAL); |
| | | } |
| | | |
| | | if (!sendBaseChangelogEntry(searchOperation)) |
| | |
| | | // so we must initialize the cookie here |
| | | searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp)); |
| | | } |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp)); |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp, SearchPhase.PERSISTENT)); |
| | | } |
| | | else |
| | | { |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp)); |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, SearchPhase.PERSISTENT)); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | else |
| | | { |
| | | entrySender = new ChangeNumberEntrySender(searchOperation); |
| | | entrySender = new ChangeNumberEntrySender(searchOperation, SearchPhase.INITIAL); |
| | | } |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null; |
| | |
| | | private final Object transitioningLock = new Object(); |
| | | private volatile K lastKeySentByInitialSearch; |
| | | |
| | | private SendEntryData(SearchPhase startPhase) |
| | | { |
| | | searchPhase.set(startPhase); |
| | | } |
| | | |
| | | private void finalizeInitialSearch() |
| | | { |
| | | searchPhase.set(SearchPhase.PERSISTENT); |
| | |
| | | private static class ChangeNumberEntrySender |
| | | { |
| | | private final SearchOperation searchOp; |
| | | private final SendEntryData<Long> sendEntryData = new SendEntryData<Long>(); |
| | | private final SendEntryData<Long> sendEntryData; |
| | | |
| | | private ChangeNumberEntrySender(SearchOperation searchOp) |
| | | private ChangeNumberEntrySender(SearchOperation searchOp, SearchPhase startPhase) |
| | | { |
| | | this.searchOp = searchOp; |
| | | this.sendEntryData = new SendEntryData<Long>(startPhase); |
| | | } |
| | | |
| | | private void finalizeInitialSearch() |
| | |
| | | /** Sends entries to clients for cookie-based searches. */ |
| | | private static class CookieEntrySender { |
| | | private final SearchOperation searchOp; |
| | | private final SearchPhase startPhase; |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR); |
| | | |
| | | private CookieEntrySender(SearchOperation searchOp) |
| | | private CookieEntrySender(SearchOperation searchOp, SearchPhase startPhase) |
| | | { |
| | | this.searchOp = searchOp; |
| | | this.startPhase = startPhase; |
| | | } |
| | | |
| | | public void finalizeInitialSearch() |
| | |
| | | SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId); |
| | | if (data == null) |
| | | { |
| | | final SendEntryData<CSN> newData = new SendEntryData<CSN>(); |
| | | final SendEntryData<CSN> newData = new SendEntryData<CSN>(startPhase); |
| | | data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData); |
| | | return data == null ? newData : data; |
| | | } |