| | |
| | | package org.opends.server.backends; |
| | | |
| | | import java.text.SimpleDateFormat; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.Collection; |
| | | import java.util.Collections; |
| | | import java.util.Date; |
| | |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.TimeZone; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.types.WritabilityMode; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.config.ConfigConstants.*; |
| | |
| | | * request. The cookie provided in the control is used to retrieve entries from |
| | | * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with |
| | | * the entries.</li> |
| | | * <li>Draft compatibility mode: when no "ECL Cookie Exchange Control" is provided |
| | | * with the request. The entries are retrieved using the ChangeNumberIndexDB (or |
| | | * DraftDB, hence the name) and their attributes are set with the information |
| | | * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set |
| | | * from the content of ChangeNumberIndexDB.</li> |
| | | * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided |
| | | * with the request. The entries are retrieved using the ChangeNumberIndexDB and |
| | | * their attributes are set with the information from the ReplicasDBs. The |
| | | * <code>changeNumber</code> attribute value is set from the content of |
| | | * ChangeNumberIndexDB.</li> |
| | | * </ul> |
| | | * <h3>Searches flow</h3> |
| | | * <p> |
| | |
| | | * (once, single threaded),</li> |
| | | * <li> |
| | | * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li> |
| | | * <li> |
| | | * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)} |
| | | * (multiple times, multi threaded)</li> |
| | | * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi |
| | | * threaded)</li> |
| | | * </ol> |
| | | * </li> |
| | | * <li>Persistent searches with <code>changesOnly=true</code> go through: |
| | |
| | | * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)} |
| | | * (once, single threaded)</li> |
| | | * <li> |
| | | * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)} |
| | | * (multiple times, multi threaded)</li> |
| | | * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi |
| | | * threaded)</li> |
| | | * </ol> |
| | | * </li> |
| | | * </ul> |
| | |
| | | |
| | | private static final String CHANGE_NUMBER_ATTR = "changeNumber"; |
| | | private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase(); |
| | | private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie"; |
| | | private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender"; |
| | | |
| | | /** The set of objectclasses that will be used in root entry. */ |
| | | private static final Map<ObjectClass, String> |
| | |
| | | private final ReplicationServer replicationServer; |
| | | private final ECLEnabledDomainPredicate domainPredicate; |
| | | |
| | | /** The set of cookie-based persistent searches registered with this backend. */ |
| | | private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches = |
| | | new ConcurrentLinkedQueue<PersistentSearch>(); |
| | | /** |
| | | * The set of change number-based persistent searches registered with this |
| | | * backend. |
| | | */ |
| | | private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches = |
| | | new ConcurrentLinkedQueue<PersistentSearch>(); |
| | | |
| | | /** |
| | | * Creates a new backend with the provided replication server. |
| | | * |
| | |
| | | } |
| | | |
| | | /** |
| | | * Notifies persistent searches of this backend that a new entry was added to it. |
| | | * Notifies persistent searches of this backend that a new cookie entry was added to it. |
| | | * <p> |
| | | * Note: This method correspond to the "persistent search" phase. |
| | | * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled. |
| | | * <p> |
| | | * This method must only be called after the provided data have been persisted to disk. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the newly added entry. |
| | | * @param updateMsg |
| | | * the update message of the newly added entry |
| | | * @throws ChangelogException |
| | | * If a problem occurs while notifying of the newly added entry. |
| | | */ |
| | | public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException |
| | | { |
| | | if (!(updateMsg instanceof LDAPUpdateMsg)) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | for (PersistentSearch pSearch : cookieBasedPersistentSearches) |
| | | { |
| | | final SearchOperation searchOp = pSearch.getSearchOperation(); |
| | | final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT); |
| | | entrySender.persistentSearchSendEntry(baseDN, updateMsg); |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | throw new ChangelogException(e.getMessageObject(), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Notifies persistent searches of this backend that a new change number entry was added to it. |
| | | * <p> |
| | | * Note: This method correspond to the "persistent search" phase. |
| | | * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled. |
| | | * <p> |
| | | * This method must only be called after the provided data have been persisted to disk. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the newly added entry. |
| | |
| | | * @throws ChangelogException |
| | | * If a problem occurs while notifying of the newly added entry. |
| | | */ |
| | | public void notifyEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg) |
| | | public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | { |
| | | final boolean isCookieEntry = changeNumber <= 0; |
| | | final List<SearchOperation> pSearchOps = getPersistentSearches(isCookieEntry); |
| | | if (pSearchOps.isEmpty() || !(updateMsg instanceof LDAPUpdateMsg)) |
| | | if (!(updateMsg instanceof LDAPUpdateMsg)) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg); |
| | | for (SearchOperation pSearchOp : pSearchOps) |
| | | // changeNumber entry can be shared with multiple persistent searches |
| | | final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg); |
| | | for (PersistentSearch pSearch : changeNumberBasedPersistentSearches) |
| | | { |
| | | final MultiDomainServerState cookie = (MultiDomainServerState) |
| | | pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL); |
| | | |
| | | // when returning changesOnly, the first incoming update must return |
| | | // the base entry before any other changes, |
| | | // so force sending now, when protected by the synchronized block |
| | | if (isCookieEntry) |
| | | { // cookie based search |
| | | final String cookieStr; |
| | | synchronized (cookie) |
| | | { // forbid concurrent updates to the cookie |
| | | cookie.update(baseDN, updateMsg.getCSN()); |
| | | cookieStr = cookie.toString(); |
| | | } |
| | | final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg); |
| | | // FIXME JNR use this instead of previous line: |
| | | // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr)); |
| | | sendEntryIfMatches(pSearchOp, entry2, cookieStr); |
| | | } |
| | | else |
| | | { // draft changeNumber search |
| | | sendEntryIfMatches(pSearchOp, entry, null); |
| | | } |
| | | final SearchOperation searchOp = pSearch.getSearchOperation(); |
| | | final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT); |
| | | entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry); |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | |
| | | } |
| | | } |
| | | |
| | | private List<SearchOperation> getPersistentSearches(boolean wantCookieBasedSearch) |
| | | { |
| | | final List<SearchOperation> results = new ArrayList<SearchOperation>(); |
| | | for (PersistentSearch pSearch : getPersistentSearches()) |
| | | { |
| | | final SearchOperation op = pSearch.getSearchOperation(); |
| | | if (wantCookieBasedSearch == isCookieBased(op)) |
| | | { |
| | | results.add(op); |
| | | } |
| | | } |
| | | return results; |
| | | } |
| | | |
| | | private boolean isCookieBased(final SearchOperation searchOp) |
| | | { |
| | | for (Control c : searchOp.getRequestControls()) |
| | |
| | | { |
| | | validateProvidedCookie(searchParams); |
| | | |
| | | // send the base changelog entry immediately even for changesOnly=false persistent searches |
| | | if (!sendBaseChangelogEntry(searchOperation)) |
| | | { |
| | | // only return the base entry: stop here |
| | | return; |
| | | } |
| | | |
| | | final MultiDomainServerState cookie = searchParams.cookie; |
| | | final CookieEntrySender entrySender; |
| | | if (isPersistentSearch(searchOperation)) |
| | | { |
| | | // communicate the cookie between the "initial search" phase and the "persistent search" phase |
| | | searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, searchParams.cookie); |
| | | searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie); |
| | | entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); |
| | | } |
| | | else |
| | | { |
| | | entrySender = new CookieEntrySender(searchOperation); |
| | | } |
| | | |
| | | if (!sendBaseChangelogEntry(searchOperation)) |
| | | { // only return the base entry: stop here |
| | | return; |
| | | } |
| | | |
| | | ECLMultiDomainDBCursor replicaUpdatesCursor = null; |
| | |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | searchParams.cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | |
| | | boolean continueSearch = true; |
| | | while (continueSearch && replicaUpdatesCursor.next()) |
| | | boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie); |
| | | if (continueSearch) |
| | | { |
| | | // Handle the update message |
| | | final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); |
| | | final DN domainBaseDN = replicaUpdatesCursor.getData(); |
| | | searchParams.cookie.update(domainBaseDN, updateMsg.getCSN()); |
| | | final String cookieString = searchParams.cookie.toString(); |
| | | |
| | | final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg); |
| | | continueSearch = sendEntryIfMatches(searchOperation, entry, cookieString); |
| | | entrySender.transitioningToPersistentSearchPhase(); |
| | | sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | entrySender.finalizeInitialSearch(); |
| | | StaticUtils.close(replicaUpdatesCursor); |
| | | } |
| | | } |
| | | |
| | | private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender, |
| | | ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException, |
| | | DirectoryException |
| | | { |
| | | boolean continueSearch = true; |
| | | while (continueSearch && replicaUpdatesCursor.next()) |
| | | { |
| | | final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); |
| | | final DN domainBaseDN = replicaUpdatesCursor.getData(); |
| | | continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie); |
| | | } |
| | | return continueSearch; |
| | | } |
| | | |
| | | private boolean isPersistentSearch(SearchOperation op) |
| | | { |
| | | for (PersistentSearch pSearch : getPersistentSearches()) |
| | |
| | | @Override |
| | | public void registerPersistentSearch(PersistentSearch pSearch) |
| | | { |
| | | final SearchOperation searchOp = pSearch.getSearchOperation(); |
| | | if (pSearch.isChangesOnly()) |
| | | initializeAttachements(pSearch); |
| | | |
| | | if (isCookieBased(pSearch.getSearchOperation())) |
| | | { |
| | | // this changesOnly persistent search will not go through #search0() down below |
| | | // so we must initialize the entrySender here and never return the base entry |
| | | searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, getNewestCookie(searchOp)); |
| | | cookieBasedPersistentSearches.add(pSearch); |
| | | } |
| | | else |
| | | { |
| | | changeNumberBasedPersistentSearches.add(pSearch); |
| | | } |
| | | super.registerPersistentSearch(pSearch); |
| | | } |
| | | |
| | | super.registerPersistentSearch(pSearch); |
| | | private void initializeAttachements(PersistentSearch pSearch) |
| | | { |
| | | final SearchOperation searchOp = pSearch.getSearchOperation(); |
| | | if (isCookieBased(searchOp)) |
| | | { |
| | | if (pSearch.isChangesOnly()) |
| | | { |
| | | // this changesOnly persistent search will not go through #initialSearch() |
| | | // so we must initialize the cookie here |
| | | searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp)); |
| | | } |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp)); |
| | | } |
| | | else |
| | | { |
| | | searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp)); |
| | | } |
| | | } |
| | | |
| | | private MultiDomainServerState getNewestCookie(SearchOperation searchOp) |
| | |
| | | */ |
| | | private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException |
| | | { |
| | | final MultiDomainServerState state = searchParams.cookie; |
| | | if (state != null && !state.isEmpty()) |
| | | final MultiDomainServerState cookie = searchParams.cookie; |
| | | if (cookie != null && !cookie.isEmpty()) |
| | | { |
| | | replicationServer.validateCookie(state, searchParams.getExcludedBaseDNs()); |
| | | replicationServer.validateCookie(cookie, searchParams.getExcludedBaseDNs()); |
| | | } |
| | | } |
| | | |
| | |
| | | // "initial search" phase must return the base entry immediately |
| | | sendBaseChangelogEntry(searchOperation); |
| | | |
| | | final ChangeNumberEntrySender entrySender; |
| | | if (isPersistentSearch(searchOperation)) |
| | | { |
| | | entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT); |
| | | } |
| | | else |
| | | { |
| | | entrySender = new ChangeNumberEntrySender(searchOperation); |
| | | } |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null; |
| | | MultiDomainDBCursor replicaUpdatesCursor = null; |
| | | final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<MultiDomainDBCursor>(); |
| | | try |
| | | { |
| | | cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber); |
| | | final boolean continueSearch = |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor); |
| | | if (continueSearch) |
| | | { |
| | | entrySender.transitioningToPersistentSearchPhase(); |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | entrySender.finalizeInitialSearch(); |
| | | StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor.get()); |
| | | } |
| | | } |
| | | |
| | | private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender, |
| | | final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, |
| | | AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor) throws ChangelogException, DirectoryException |
| | | { |
| | | boolean continueSearch = true; |
| | | while (continueSearch && cnIndexDBCursor.next()) |
| | | { |
| | | // Handle the current cnIndex record |
| | | final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord(); |
| | | if (replicaUpdatesCursor == null) |
| | | if (replicaUpdatesCursor.get() == null) |
| | | { |
| | | replicaUpdatesCursor = initializeReplicaUpdatesCursor(cnIndexRecord); |
| | | replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord)); |
| | | } |
| | | continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); |
| | | if (continueSearch) |
| | | { |
| | | UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor); |
| | | final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get()); |
| | | if (updateMsg != null) |
| | | { |
| | | continueSearch = sendEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg); |
| | | replicaUpdatesCursor.next(); |
| | | continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg); |
| | | replicaUpdatesCursor.get().next(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean sendEntryForUpdateMessage(SearchOperation searchOperation, |
| | | ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException |
| | | { |
| | | final DN baseDN = cnIndexRecord.getBaseDN(); |
| | | final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie()); |
| | | cookie.update(baseDN, cnIndexRecord.getCSN()); |
| | | final String cookieString = cookie.toString(); |
| | | |
| | | final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg); |
| | | return sendEntryIfMatches(searchOperation, entry, null); |
| | | return continueSearch; |
| | | } |
| | | |
| | | private MultiDomainDBCursor initializeReplicaUpdatesCursor( |
| | |
| | | /** |
| | | * Creates a changelog entry. |
| | | */ |
| | | private Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) |
| | | throws DirectoryException |
| | | private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, |
| | | final UpdateMsg msg) throws DirectoryException |
| | | { |
| | | if (msg instanceof AddMsg) |
| | | { |
| | |
| | | * change initiators name if available which is contained in the creatorsName |
| | | * attribute. |
| | | */ |
| | | private Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) |
| | | private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) |
| | | throws DirectoryException |
| | | { |
| | | final AddMsg addMsg = (AddMsg) msg; |
| | |
| | | * out change initiators name if available which is contained in the |
| | | * modifiersName attribute. |
| | | */ |
| | | private Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) |
| | | throws DirectoryException |
| | | private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, |
| | | final UpdateMsg msg) throws DirectoryException |
| | | { |
| | | final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg; |
| | | String changeInitiatorsName = null; |
| | |
| | | * @param entryDN |
| | | * DN of original entry |
| | | */ |
| | | private void logEncodingMessageError(String messageType, DN entryDN, Exception exception) |
| | | private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, exception); |
| | | logError(Message.raw(Category.SYNC, Severity.MILD_ERROR, |
| | |
| | | String dnString; |
| | | if (changeNumber > 0) |
| | | { |
| | | // Draft compat mode |
| | | // change number mode |
| | | dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT; |
| | | } |
| | | else |
| | |
| | | * |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie) throws DirectoryException |
| | | private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie) |
| | | throws DirectoryException |
| | | { |
| | | if (matchBaseAndScopeAndFilter(searchOp, entry)) |
| | | { |
| | |
| | | } |
| | | |
| | | /** Indicates if the provided entry matches the filter, base and scope. */ |
| | | private boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException |
| | | private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException |
| | | { |
| | | return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope()) |
| | | && searchOp.getFilter().matchesEntry(entry); |
| | | } |
| | | |
| | | private List<Control> getControls(String cookie) |
| | | private static List<Control> getControls(String cookie) |
| | | { |
| | | if (cookie != null) |
| | | { |
| | | Control c = new EntryChangelogNotificationControl(true, cookie); |
| | | return Arrays.asList(c); |
| | | final Control c = new EntryChangelogNotificationControl(true, cookie); |
| | | return Collections.singletonList(c); |
| | | } |
| | | return Collections.emptyList(); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Describes the current search phase. |
| | | */ |
| | | private enum SearchPhase |
| | | { |
| | | /** |
| | | * "Initial search" phase. The "initial search" phase is running |
| | | * concurrently. All update notifications are ignored. |
| | | */ |
| | | INITIAL, |
| | | /** |
| | | * Transitioning from the "initial search" phase to the "persistent search" |
| | | * phase. "Initial search" phase has finished reading from the DB. It now |
| | | * verifies if any more updates have been persisted to the DB since stopping |
| | | * and send them. All update notifications are blocked. |
| | | */ |
| | | TRANSITIONING, |
| | | /** |
| | | * "Persistent search" phase. "Initial search" phase has completed. All |
| | | * update notifications are published. |
| | | */ |
| | | PERSISTENT; |
| | | } |
| | | |
| | | /** |
| | | * Contains data to ensure that the same change is not sent twice to clients |
| | | * because of race conditions between the "initial search" phase and the |
| | | * "persistent search" phase. |
| | | */ |
| | | private static class SendEntryData<K extends Comparable<K>> |
| | | { |
| | | private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<SearchPhase>(SearchPhase.INITIAL); |
| | | private final Object transitioningLock = new Object(); |
| | | private volatile K lastKeySentByInitialSearch; |
| | | |
| | | private void finalizeInitialSearch() |
| | | { |
| | | searchPhase.set(SearchPhase.PERSISTENT); |
| | | synchronized (transitioningLock) |
| | | { // initial search phase has completed, release all persistent searches |
| | | transitioningLock.notifyAll(); |
| | | } |
| | | } |
| | | |
| | | public void transitioningToPersistentSearchPhase() |
| | | { |
| | | searchPhase.set(SearchPhase.TRANSITIONING); |
| | | } |
| | | |
| | | private void initialSearchSendsEntry(final K key) |
| | | { |
| | | lastKeySentByInitialSearch = key; |
| | | } |
| | | |
| | | private boolean persistentSearchCanSendEntry(K key) |
| | | { |
| | | final SearchPhase stateValue = searchPhase.get(); |
| | | switch (stateValue) |
| | | { |
| | | case INITIAL: |
| | | return false; |
| | | case TRANSITIONING: |
| | | synchronized (transitioningLock) |
| | | { |
| | | while (SearchPhase.TRANSITIONING.equals(searchPhase.get())) |
| | | { |
| | | // "initial search" phase is over, and is now verifying whether new |
| | | // changes have been published to the DB. |
| | | // Wait for this check to complete |
| | | try |
| | | { |
| | | transitioningLock.wait(); |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | | Thread.currentThread().interrupt(); |
| | | // Shutdown must have been called. Stop sending entries. |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | return key.compareTo(lastKeySentByInitialSearch) > 0; |
| | | case PERSISTENT: |
| | | return true; |
| | | default: |
| | | throw new RuntimeException("Not implemented for " + stateValue); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** Sends entries to clients for change number searches. */ |
| | | private static class ChangeNumberEntrySender |
| | | { |
| | | private final SearchOperation searchOp; |
| | | private final SendEntryData<Long> sendEntryData = new SendEntryData<Long>(); |
| | | |
| | | private ChangeNumberEntrySender(SearchOperation searchOp) |
| | | { |
| | | this.searchOp = searchOp; |
| | | } |
| | | |
| | | private void finalizeInitialSearch() |
| | | { |
| | | sendEntryData.finalizeInitialSearch(); |
| | | } |
| | | |
| | | public void transitioningToPersistentSearchPhase() |
| | | { |
| | | sendEntryData.transitioningToPersistentSearchPhase(); |
| | | } |
| | | |
| | | /** |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) |
| | | throws DirectoryException |
| | | { |
| | | final DN baseDN = cnIndexRecord.getBaseDN(); |
| | | final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie()); |
| | | cookie.update(baseDN, cnIndexRecord.getCSN()); |
| | | final String cookieString = cookie.toString(); |
| | | |
| | | sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber()); |
| | | final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg); |
| | | return sendEntryIfMatches(searchOp, entry, null); |
| | | } |
| | | |
| | | private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException |
| | | { |
| | | if (sendEntryData.persistentSearchCanSendEntry(changeNumber)) |
| | | { |
| | | sendEntryIfMatches(searchOp, entry, null); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** Sends entries to clients for cookie-based searches. */ |
| | | private static class CookieEntrySender { |
| | | private final SearchOperation searchOp; |
| | | private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData = |
| | | new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR); |
| | | |
| | | private CookieEntrySender(SearchOperation searchOp) |
| | | { |
| | | this.searchOp = searchOp; |
| | | } |
| | | |
| | | public void finalizeInitialSearch() |
| | | { |
| | | for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) |
| | | { |
| | | sendEntryData.finalizeInitialSearch(); |
| | | } |
| | | } |
| | | |
| | | public void transitioningToPersistentSearchPhase() |
| | | { |
| | | for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values()) |
| | | { |
| | | sendEntryData.transitioningToPersistentSearchPhase(); |
| | | } |
| | | } |
| | | |
| | | private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn) |
| | | { |
| | | final Pair<DN, Integer> replicaId = Pair.of(baseDN, csn.getServerId()); |
| | | SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId); |
| | | if (data == null) |
| | | { |
| | | final SendEntryData<CSN> newData = new SendEntryData<CSN>(); |
| | | data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData); |
| | | return data == null ? newData : data; |
| | | } |
| | | return data; |
| | | } |
| | | |
| | | private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN, |
| | | final MultiDomainServerState cookie) throws DirectoryException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); |
| | | sendEntryData.initialSearchSendsEntry(csn); |
| | | final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN()); |
| | | final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); |
| | | return sendEntryIfMatches(searchOp, entry, cookieString); |
| | | } |
| | | |
| | | private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg) |
| | | throws DirectoryException |
| | | { |
| | | final CSN csn = updateMsg.getCSN(); |
| | | final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn); |
| | | if (sendEntryData.persistentSearchCanSendEntry(csn)) |
| | | { |
| | | // multi threaded case: wait for the "initial search" phase to set the cookie |
| | | final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT); |
| | | |
| | | final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN()); |
| | | final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg); |
| | | // FIXME JNR use this instead of previous line: |
| | | // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString)); |
| | | sendEntryIfMatches(searchOp, cookieEntry, cookieString); |
| | | } |
| | | } |
| | | |
| | | private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn) |
| | | { |
| | | synchronized (cookie) |
| | | { // forbid concurrent updates to the cookie |
| | | cookie.update(baseDN, csn); |
| | | return cookie.toString(); |
| | | } |
| | | } |
| | | } |
| | | } |