| | |
| | | |
| | | private static final String CHANGE_NUMBER_ATTR = "changeNumber"; |
| | | private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase(); |
| | | private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie"; |
| | | private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender"; |
| | | |
| | | /** The set of objectclasses that will be used in root entry. */ |
| | |
| | | { |
| | | validateProvidedCookie(searchParams); |
| | | |
| | | final MultiDomainServerState cookie = searchParams.cookie; |
| | | final CookieEntrySender entrySender; |
| | | if (isPersistentSearch(searchOperation)) |
| | | { |
| | | // communicate the cookie between the "initial search" phase and the "persistent search" phase |
| | | searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie); |
| | | entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); |
| | | } |
| | | else |
| | | { |
| | | entrySender = new CookieEntrySender(searchOperation, SearchPhase.INITIAL); |
| | | } |
| | | entrySender.setCookie(searchParams.cookie); |
| | | |
| | | if (!sendBaseChangelogEntry(searchOperation)) |
| | | { // only return the base entry: stop here |
| | |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | searchParams.cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | |
| | | boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie); |
| | | final boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor); |
| | | if (continueSearch) |
| | | { |
| | | entrySender.transitioningToPersistentSearchPhase(); |
| | | sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie); |
| | | sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor); |
| | | } |
| | | } |
| | | finally |
| | |
| | | } |
| | | |
| | | private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender, |
| | | ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException, |
| | | DirectoryException |
| | | final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException |
| | | { |
| | | boolean continueSearch = true; |
| | | while (continueSearch && replicaUpdatesCursor.next()) |
| | | { |
| | | final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); |
| | | final DN domainBaseDN = replicaUpdatesCursor.getData(); |
| | | continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie); |
| | | continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN); |
| | | } |
| | | return continueSearch; |
| | | } |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void registerPersistentSearch(PersistentSearch pSearch) |
| | | public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException |
| | | { |
| | | initializeAttachements(pSearch); |
| | | validatePersistentSearch(pSearch); |
| | | initializeEntrySender(pSearch); |
| | | |
| | | if (isCookieBased(pSearch.getSearchOperation())) |
| | | { |
| | |
| | | super.registerPersistentSearch(pSearch); |
| | | } |
| | | |
| | | private void initializeAttachements(PersistentSearch pSearch) |
| | | private void validatePersistentSearch(final PersistentSearch pSearch) throws DirectoryException |
| | | { |
| | | // Validation must be done during registration for changes only persistent searches. |
| | | // Otherwise, when there is an initial search phase, |
| | | // validation is performed by the search() method. |
| | | if (pSearch.isChangesOnly()) |
| | | { |
| | | final SearchOperation searchOperation = pSearch.getSearchOperation(); |
| | | checkChangelogReadPrivilege(searchOperation); |
| | | final SearchParams params = buildSearchParameters(searchOperation); |
| | | // next line also validates some search parameters |
| | | optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter()); |
| | | validateProvidedCookie(params); |
| | | } |
| | | } |
| | | |
| | | private void initializeEntrySender(PersistentSearch pSearch) |
| | | { |
| | | final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL; |
| | | |
| | | final SearchOperation searchOp = pSearch.getSearchOperation(); |
| | | if (isCookieBased(searchOp)) |
| | | { |
| | | final CookieEntrySender entrySender = new CookieEntrySender(searchOp, startPhase); |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, entrySender); |
| | | if (pSearch.isChangesOnly()) |
| | | { |
| | | // this changesOnly persistent search will not go through #initialSearch() |
| | | // so we must initialize the cookie here |
| | | searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp)); |
| | | entrySender.setCookie(getNewestCookie(searchOp)); |
| | | } |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp, SearchPhase.PERSISTENT)); |
| | | } |
| | | else |
| | | { |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, SearchPhase.PERSISTENT)); |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, startPhase)); |
| | | } |
| | | } |
| | | |
| | |
| | | sendEntryData.finalizeInitialSearch(); |
| | | } |
| | | |
| | | public void transitioningToPersistentSearchPhase() |
| | | private void transitioningToPersistentSearchPhase() |
| | | { |
| | | sendEntryData.transitioningToPersistentSearchPhase(); |
| | | } |
| | |
| | | private static class CookieEntrySender { |
| | | private final SearchOperation searchOp; |
| | | private final SearchPhase startPhase; |
| | | private MultiDomainServerState cookie; |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR); |
| | | |
| | |
| | | this.startPhase = startPhase; |
| | | } |
| | | |
| | | public void finalizeInitialSearch() |
| | | private void setCookie(MultiDomainServerState cookie) |
| | | { |
| | | this.cookie = cookie; |
| | | } |
| | | |
| | | private void finalizeInitialSearch() |
| | | { |
| | | for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | public void transitioningToPersistentSearchPhase() |
| | | private void transitioningToPersistentSearchPhase() |
| | | { |
| | | for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) |
| | | { |
| | |
| | | return data; |
| | | } |
| | | |
| | | private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN, |
| | | final MultiDomainServerState cookie) throws DirectoryException |
| | | private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); |
| | | sendEntryData.initialSearchSendsEntry(csn); |
| | | final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN()); |
| | | final String cookieString = updateCookie(baseDN, updateMsg.getCSN()); |
| | | final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); |
| | | return sendEntryIfMatches(searchOp, entry, cookieString); |
| | | } |
| | |
| | | if (sendEntryData.persistentSearchCanSendEntry(csn)) |
| | | { |
| | | // multi threaded case: wait for the "initial search" phase to set the cookie |
| | | final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT); |
| | | |
| | | final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN()); |
| | | final String cookieString = updateCookie(baseDN, updateMsg.getCSN()); |
| | | final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); |
| | | // FIXME JNR use this instead of previous line: |
| | | // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString)); |
| | |
| | | } |
| | | } |
| | | |
| | | private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn) |
| | | private String updateCookie(DN baseDN, final CSN csn) |
| | | { |
| | | synchronized (cookie) |
| | | { // forbid concurrent updates to the cookie |