mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
15.35.2014 5d4dbee77b59c2636e832de6c6a04a8ce65ca5c4
OPENDJ-1541 (CR-4516) Persistent search on cn=changelog can return duplicates

Persistent searches are registered before initial search ends (which is correct).
Because a new change can be added to the changelog before the "initial search" phase is over, the "persistent search" phase can return this change before the "initial search" phase returns it later.

To avoid this problem, persistent searches is marked with an enum to mention which phase is being run. The phases are the following:
1. INITIAL: The "initial search" phase is running, the "persistent search" phase do not return any entry.
2. TRANSITIONING: The "initial search" phase has completed and blocks currently running "persistent search" phase while the former is verifying no new updates where persisted to the DB
3. PERSISTENT: The "initial search" phase is finished and completed the transition to the "persistent search" phase. The "persistent search" phase can return all entries.
For the change-number-based persistent searches, only the last changeNumber sent by the "initial search" phase is recorded. For cookie-based persistent searches, for each replica, the last CSN sent by the "initial search" phase is recorded.

Problem is that the transitioning phase has the potential to block the whole server if the client of the persistent search does not consume changes fast enough.
This will be addressed separately.


ChangelogBackend.java:
Added constants COOKIE_ATTACHMENT and ENTRY_SENDER_ATTACHMENT.
Added cookieBasedPersistentSearches and changeNumberBasedPersistentSearches fields.
Added SearchPhase enum.
Added CookieEntrySender, ChangeNumberEntrySender and SendEntryData static inner classes + made several methods static to call them from these classes.
In initialSearchFromCookie(), initialSearchFromChangeNumber(), notifyEntryAdded() and registerPersistentSearch(), set or retrieved attachments + used entrySender.
Extracted methods sendCookieEntriesFromCursor(), sendChangeNumberEntriesFromCursors().
Added initializeAttachements().
Split notifyEntryAdded() in two: notifyCookieEntryAdded() and notifyChangeNumberEntryAdded().
1 files modified
522 ■■■■ changed files
opends/src/server/org/opends/server/backends/ChangelogBackend.java 522 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -26,8 +26,6 @@
package org.opends.server.backends;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -38,6 +36,9 @@
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -110,6 +111,8 @@
import org.opends.server.types.WritabilityMode;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
@@ -132,11 +135,11 @@
 * request. The cookie provided in the control is used to retrieve entries from
 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
 * the entries.</li>
 * <li>Draft compatibility mode: when no "ECL Cookie Exchange Control" is provided
 * with the request. The entries are retrieved using the ChangeNumberIndexDB (or
 * DraftDB, hence the name) and their attributes are set with the information
 * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
 * from the content of ChangeNumberIndexDB.</li>
 * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided
 * with the request. The entries are retrieved using the ChangeNumberIndexDB and
 * their attributes are set with the information from the ReplicasDBs. The
 * <code>changeNumber</code> attribute value is set from the content of
 * ChangeNumberIndexDB.</li>
 * </ul>
 * <h3>Searches flow</h3>
 * <p>
@@ -153,9 +156,8 @@
 * (once, single threaded),</li>
 * <li>
 * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
 * <li>
 * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)}
 * (multiple times, multi threaded)</li>
 * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
 * threaded)</li>
 * </ol>
 * </li>
 * <li>Persistent searches with <code>changesOnly=true</code> go through:
@@ -163,8 +165,8 @@
 * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
 * (once, single threaded)</li>
 * <li>
 * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)}
 * (multiple times, multi threaded)</li>
 * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
 * threaded)</li>
 * </ol>
 * </li>
 * </ul>
@@ -182,6 +184,8 @@
  private static final String CHANGE_NUMBER_ATTR = "changeNumber";
  private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
  private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie";
  private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
  /** The set of objectclasses that will be used in root entry. */
  private static final Map<ObjectClass, String>
@@ -237,6 +241,16 @@
  private final ReplicationServer replicationServer;
  private final ECLEnabledDomainPredicate domainPredicate;
  /** The set of cookie-based persistent searches registered with this backend. */
  private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches =
      new ConcurrentLinkedQueue<PersistentSearch>();
  /**
   * The set of change number-based persistent searches registered with this
   * backend.
   */
  private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches =
      new ConcurrentLinkedQueue<PersistentSearch>();
  /**
   * Creates a new backend with the provided replication server.
   *
@@ -406,10 +420,49 @@
  }
  /**
   * Notifies persistent searches of this backend that a new entry was added to it.
   * Notifies persistent searches of this backend that a new cookie entry was added to it.
   * <p>
   * Note: This method correspond to the "persistent search" phase.
   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
   * <p>
   * This method must only be called after the provided data have been persisted to disk.
   *
   * @param baseDN
   *          the baseDN of the newly added entry.
   * @param updateMsg
   *          the update message of the newly added entry
   * @throws ChangelogException
   *           If a problem occurs while notifying of the newly added entry.
   */
  public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException
  {
    if (!(updateMsg instanceof LDAPUpdateMsg))
    {
      return;
    }
    try
    {
      for (PersistentSearch pSearch : cookieBasedPersistentSearches)
      {
        final SearchOperation searchOp = pSearch.getSearchOperation();
        final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
        entrySender.persistentSearchSendEntry(baseDN, updateMsg);
      }
    }
    catch (DirectoryException e)
    {
      throw new ChangelogException(e.getMessageObject(), e);
    }
  }
  /**
   * Notifies persistent searches of this backend that a new change number entry was added to it.
   * <p>
   * Note: This method correspond to the "persistent search" phase.
   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
   * <p>
   * This method must only be called after the provided data have been persisted to disk.
   *
   * @param baseDN
   *          the baseDN of the newly added entry.
@@ -425,44 +478,23 @@
   * @throws ChangelogException
   *           If a problem occurs while notifying of the newly added entry.
   */
  public void notifyEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
  public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
      throws ChangelogException
  {
    final boolean isCookieEntry = changeNumber <= 0;
    final List<SearchOperation> pSearchOps = getPersistentSearches(isCookieEntry);
    if (pSearchOps.isEmpty() || !(updateMsg instanceof LDAPUpdateMsg))
    if (!(updateMsg instanceof LDAPUpdateMsg))
    {
      return;
    }
    try
    {
      final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
      for (SearchOperation pSearchOp : pSearchOps)
      // changeNumber entry can be shared with multiple persistent searches
      final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
      for (PersistentSearch pSearch : changeNumberBasedPersistentSearches)
      {
        final MultiDomainServerState cookie = (MultiDomainServerState)
            pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL);
        // when returning changesOnly, the first incoming update must return
        // the base entry before any other changes,
        // so force sending now, when protected by the synchronized block
        if (isCookieEntry)
        { // cookie based search
          final String cookieStr;
          synchronized (cookie)
          { // forbid concurrent updates to the cookie
            cookie.update(baseDN, updateMsg.getCSN());
            cookieStr = cookie.toString();
          }
          final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg);
          // FIXME JNR use this instead of previous line:
          // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr));
          sendEntryIfMatches(pSearchOp, entry2, cookieStr);
        }
        else
        { // draft changeNumber search
          sendEntryIfMatches(pSearchOp, entry, null);
        }
        final SearchOperation searchOp = pSearch.getSearchOperation();
        final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
        entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry);
      }
    }
    catch (DirectoryException e)
@@ -471,20 +503,6 @@
    }
  }
  private List<SearchOperation> getPersistentSearches(boolean wantCookieBasedSearch)
  {
    final List<SearchOperation> results = new ArrayList<SearchOperation>();
    for (PersistentSearch pSearch : getPersistentSearches())
    {
      final SearchOperation op = pSearch.getSearchOperation();
      if (wantCookieBasedSearch == isCookieBased(op))
      {
        results.add(op);
      }
    }
    return results;
  }
  private boolean isCookieBased(final SearchOperation searchOp)
  {
    for (Control c : searchOp.getRequestControls())
@@ -966,17 +984,22 @@
  {
    validateProvidedCookie(searchParams);
    // send the base changelog entry immediately even for changesOnly=false persistent searches
    if (!sendBaseChangelogEntry(searchOperation))
    {
      // only return the base entry: stop here
      return;
    }
    final MultiDomainServerState cookie = searchParams.cookie;
    final CookieEntrySender entrySender;
    if (isPersistentSearch(searchOperation))
    {
      // communicate the cookie between the "initial search" phase and the "persistent search" phase
      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, searchParams.cookie);
      searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie);
      entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
    }
    else
    {
      entrySender = new CookieEntrySender(searchOperation);
    }
    if (!sendBaseChangelogEntry(searchOperation))
    { // only return the base entry: stop here
      return;
    }
    ECLMultiDomainDBCursor replicaUpdatesCursor = null;
@@ -984,28 +1007,37 @@
    {
      final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
      final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
          searchParams.cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
          cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
      replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      boolean continueSearch = true;
      while (continueSearch && replicaUpdatesCursor.next())
      boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
      if (continueSearch)
      {
        // Handle the update message
        final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
        final DN domainBaseDN = replicaUpdatesCursor.getData();
        searchParams.cookie.update(domainBaseDN, updateMsg.getCSN());
        final String cookieString = searchParams.cookie.toString();
        final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg);
        continueSearch = sendEntryIfMatches(searchOperation, entry, cookieString);
        entrySender.transitioningToPersistentSearchPhase();
        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
      }
    }
    finally
    {
      entrySender.finalizeInitialSearch();
      StaticUtils.close(replicaUpdatesCursor);
    }
  }
  private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
      ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException,
      DirectoryException
  {
    boolean continueSearch = true;
    while (continueSearch && replicaUpdatesCursor.next())
    {
      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
      final DN domainBaseDN = replicaUpdatesCursor.getData();
      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie);
    }
    return continueSearch;
  }
  private boolean isPersistentSearch(SearchOperation op)
  {
    for (PersistentSearch pSearch : getPersistentSearches())
@@ -1022,15 +1054,36 @@
  @Override
  public void registerPersistentSearch(PersistentSearch pSearch)
  {
    final SearchOperation searchOp = pSearch.getSearchOperation();
    if (pSearch.isChangesOnly())
    initializeAttachements(pSearch);
    if (isCookieBased(pSearch.getSearchOperation()))
    {
      // this changesOnly persistent search will not go through #search0() down below
      // so we must initialize the entrySender here and never return the base entry
      searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, getNewestCookie(searchOp));
      cookieBasedPersistentSearches.add(pSearch);
    }
    else
    {
      changeNumberBasedPersistentSearches.add(pSearch);
    }
    super.registerPersistentSearch(pSearch);
    }
    super.registerPersistentSearch(pSearch);
  private void initializeAttachements(PersistentSearch pSearch)
  {
    final SearchOperation searchOp = pSearch.getSearchOperation();
    if (isCookieBased(searchOp))
    {
      if (pSearch.isChangesOnly())
      {
        // this changesOnly persistent search will not go through #initialSearch()
        // so we must initialize the cookie here
        searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp));
      }
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp));
    }
    else
    {
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp));
    }
  }
  private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
@@ -1060,10 +1113,10 @@
   */
  private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
  {
    final MultiDomainServerState state = searchParams.cookie;
    if (state != null && !state.isEmpty())
    final MultiDomainServerState cookie = searchParams.cookie;
    if (cookie != null && !cookie.isEmpty())
    {
      replicationServer.validateCookie(state, searchParams.getExcludedBaseDNs());
      replicationServer.validateCookie(cookie, searchParams.getExcludedBaseDNs());
    }
  }
@@ -1076,51 +1129,61 @@
    // "initial search" phase must return the base entry immediately
    sendBaseChangelogEntry(searchOperation);
    final ChangeNumberEntrySender entrySender;
    if (isPersistentSearch(searchOperation))
    {
      entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
    }
    else
    {
      entrySender = new ChangeNumberEntrySender(searchOperation);
    }
    DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
    MultiDomainDBCursor replicaUpdatesCursor = null;
    final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<MultiDomainDBCursor>();
    try
    {
      cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
      final boolean continueSearch =
          sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor);
      if (continueSearch)
      {
        entrySender.transitioningToPersistentSearchPhase();
        sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor);
      }
    }
    finally
    {
      entrySender.finalizeInitialSearch();
      StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor.get());
    }
  }
  private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
      final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor,
      AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor) throws ChangelogException, DirectoryException
  {
      boolean continueSearch = true;
      while (continueSearch && cnIndexDBCursor.next())
      {
        // Handle the current cnIndex record
        final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
        if (replicaUpdatesCursor == null)
      if (replicaUpdatesCursor.get() == null)
        {
          replicaUpdatesCursor = initializeReplicaUpdatesCursor(cnIndexRecord);
        replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
        }
        continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
        if (continueSearch)
        {
          UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
        final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
          if (updateMsg != null)
          {
            continueSearch = sendEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
            replicaUpdatesCursor.next();
          continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg);
          replicaUpdatesCursor.get().next();
          }
        }
      }
    }
    finally
    {
      StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor);
    }
  }
  /**
   * @return {@code true} if search should continue, {@code false} otherwise
   */
  private boolean sendEntryForUpdateMessage(SearchOperation searchOperation,
      ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException
  {
    final DN baseDN = cnIndexRecord.getBaseDN();
    final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
    cookie.update(baseDN, cnIndexRecord.getCSN());
    final String cookieString = cookie.toString();
    final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
    return sendEntryIfMatches(searchOperation, entry, null);
    return continueSearch;
  }
  private MultiDomainDBCursor initializeReplicaUpdatesCursor(
@@ -1196,8 +1259,8 @@
  /**
   * Creates a changelog entry.
   */
  private Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
      throws DirectoryException
  private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie,
      final UpdateMsg msg) throws DirectoryException
  {
    if (msg instanceof AddMsg)
    {
@@ -1224,7 +1287,7 @@
   * change initiators name if available which is contained in the creatorsName
   * attribute.
   */
  private Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
  private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
      throws DirectoryException
  {
    final AddMsg addMsg = (AddMsg) msg;
@@ -1265,8 +1328,8 @@
   * out change initiators name if available which is contained in the
   * modifiersName attribute.
   */
  private Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
      throws DirectoryException
  private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie,
      final UpdateMsg msg) throws DirectoryException
  {
    final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
    String changeInitiatorsName = null;
@@ -1330,7 +1393,7 @@
   * @param entryDN
   *            DN of original entry
   */
  private void logEncodingMessageError(String messageType, DN entryDN,  Exception exception)
  private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
  {
    TRACER.debugCaught(DebugLogLevel.ERROR, exception);
    logError(Message.raw(Category.SYNC, Severity.MILD_ERROR,
@@ -1359,7 +1422,7 @@
    String dnString;
    if (changeNumber > 0)
    {
      // Draft compat mode
      // change number mode
      dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
    }
    else
@@ -1439,7 +1502,8 @@
   *
   * @return {@code true} if search should continue, {@code false} otherwise
   */
  private boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie) throws DirectoryException
  private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie)
      throws DirectoryException
  {
    if (matchBaseAndScopeAndFilter(searchOp, entry))
    {
@@ -1450,18 +1514,18 @@
  }
  /** Indicates if the provided entry matches the filter, base and scope. */
  private boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
  private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
  {
    return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
        && searchOp.getFilter().matchesEntry(entry);
  }
  private List<Control> getControls(String cookie)
  private static List<Control> getControls(String cookie)
  {
    if (cookie != null)
    {
      Control c = new EntryChangelogNotificationControl(true, cookie);
      return Arrays.asList(c);
      final Control c = new EntryChangelogNotificationControl(true, cookie);
      return Collections.singletonList(c);
    }
    return Collections.emptyList();
  }
@@ -1557,4 +1621,218 @@
    }
  }
  /**
   * Describes the current search phase.
   */
  private enum SearchPhase
  {
    /**
     * "Initial search" phase. The "initial search" phase is running
     * concurrently. All update notifications are ignored.
     */
    INITIAL,
    /**
     * Transitioning from the "initial search" phase to the "persistent search"
     * phase. "Initial search" phase has finished reading from the DB. It now
     * verifies if any more updates have been persisted to the DB since stopping
     * and send them. All update notifications are blocked.
     */
    TRANSITIONING,
    /**
     * "Persistent search" phase. "Initial search" phase has completed. All
     * update notifications are published.
     */
    PERSISTENT;
  }
  /**
   * Contains data to ensure that the same change is not sent twice to clients
   * because of race conditions between the "initial search" phase and the
   * "persistent search" phase.
   */
  private static class SendEntryData<K extends Comparable<K>>
  {
    private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<SearchPhase>(SearchPhase.INITIAL);
    private final Object transitioningLock = new Object();
    private volatile K lastKeySentByInitialSearch;
    private void finalizeInitialSearch()
    {
      searchPhase.set(SearchPhase.PERSISTENT);
      synchronized (transitioningLock)
      { // initial search phase has completed, release all persistent searches
        transitioningLock.notifyAll();
      }
    }
    public void transitioningToPersistentSearchPhase()
    {
      searchPhase.set(SearchPhase.TRANSITIONING);
    }
    private void initialSearchSendsEntry(final K key)
    {
      lastKeySentByInitialSearch = key;
    }
    private boolean persistentSearchCanSendEntry(K key)
    {
      final SearchPhase stateValue = searchPhase.get();
      switch (stateValue)
      {
      case INITIAL:
        return false;
      case TRANSITIONING:
        synchronized (transitioningLock)
        {
          while (SearchPhase.TRANSITIONING.equals(searchPhase.get()))
          {
            // "initial search" phase is over, and is now verifying whether new
            // changes have been published to the DB.
            // Wait for this check to complete
            try
            {
              transitioningLock.wait();
            }
            catch (InterruptedException e)
            {
              Thread.currentThread().interrupt();
              // Shutdown must have been called. Stop sending entries.
              return false;
            }
          }
        }
        return key.compareTo(lastKeySentByInitialSearch) > 0;
      case PERSISTENT:
        return true;
      default:
        throw new RuntimeException("Not implemented for " + stateValue);
      }
    }
  }
  /** Sends entries to clients for change number searches. */
  private static class ChangeNumberEntrySender
  {
    private final SearchOperation searchOp;
    private final SendEntryData<Long> sendEntryData = new SendEntryData<Long>();
    private ChangeNumberEntrySender(SearchOperation searchOp)
    {
      this.searchOp = searchOp;
    }
    private void finalizeInitialSearch()
    {
      sendEntryData.finalizeInitialSearch();
    }
    public void transitioningToPersistentSearchPhase()
    {
      sendEntryData.transitioningToPersistentSearchPhase();
    }
    /**
     * @return {@code true} if search should continue, {@code false} otherwise
     */
    private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg)
        throws DirectoryException
    {
      final DN baseDN = cnIndexRecord.getBaseDN();
      final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
      cookie.update(baseDN, cnIndexRecord.getCSN());
      final String cookieString = cookie.toString();
      sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
      final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
      return sendEntryIfMatches(searchOp, entry, null);
    }
    private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException
    {
      if (sendEntryData.persistentSearchCanSendEntry(changeNumber))
      {
        sendEntryIfMatches(searchOp, entry, null);
      }
    }
  }
  /** Sends entries to clients for cookie-based searches. */
  private static class CookieEntrySender {
    private final SearchOperation searchOp;
    private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
        new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
    private CookieEntrySender(SearchOperation searchOp)
    {
      this.searchOp = searchOp;
    }
    public void finalizeInitialSearch()
    {
      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
      {
        sendEntryData.finalizeInitialSearch();
      }
    }
    public void transitioningToPersistentSearchPhase()
    {
      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
      {
        sendEntryData.transitioningToPersistentSearchPhase();
      }
    }
    private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn)
    {
      final Pair<DN, Integer> replicaId = Pair.of(baseDN, csn.getServerId());
      SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId);
      if (data == null)
      {
        final SendEntryData<CSN> newData = new SendEntryData<CSN>();
        data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData);
        return data == null ? newData : data;
      }
      return data;
    }
    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN,
        final MultiDomainServerState cookie) throws DirectoryException
    {
      final CSN csn = updateMsg.getCSN();
      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
      sendEntryData.initialSearchSendsEntry(csn);
      final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
      final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
      return sendEntryIfMatches(searchOp, entry, cookieString);
    }
    private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg)
        throws DirectoryException
    {
      final CSN csn = updateMsg.getCSN();
      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
      if (sendEntryData.persistentSearchCanSendEntry(csn))
      {
        // multi threaded case: wait for the "initial search" phase to set the cookie
        final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT);
        final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
        final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
        // FIXME JNR use this instead of previous line:
        // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
        sendEntryIfMatches(searchOp, cookieEntry, cookieString);
      }
    }
    private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn)
    {
      synchronized (cookie)
      { // forbid concurrent updates to the cookie
        cookie.update(baseDN, csn);
        return cookie.toString();
      }
    }
  }
}