From 5d4dbee77b59c2636e832de6c6a04a8ce65ca5c4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 15 Sep 2014 09:35:18 +0000
Subject: [PATCH] OPENDJ-1541 (CR-4516) Persistent search on cn=changelog can return duplicates

---
 opends/src/server/org/opends/server/backends/ChangelogBackend.java |  536 +++++++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 407 insertions(+), 129 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/ChangelogBackend.java b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
index be5fe9b..8445800 100644
--- a/opends/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -26,8 +26,6 @@
 package org.opends.server.backends;
 
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -38,6 +36,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -110,6 +111,8 @@
 import org.opends.server.types.WritabilityMode;
 import org.opends.server.util.StaticUtils;
 
+import com.forgerock.opendj.util.Pair;
+
 import static org.opends.messages.BackendMessages.*;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.config.ConfigConstants.*;
@@ -132,11 +135,11 @@
  * request. The cookie provided in the control is used to retrieve entries from
  * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
  * the entries.</li>
- * <li>Draft compatibility mode: when no "ECL Cookie Exchange Control" is provided
- * with the request. The entries are retrieved using the ChangeNumberIndexDB (or
- * DraftDB, hence the name) and their attributes are set with the information
- * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
- * from the content of ChangeNumberIndexDB.</li>
+ * <li>Change number mode: when no "ECL Cookie Exchange Control" is provided
+ * with the request. The entries are retrieved using the ChangeNumberIndexDB and
+ * their attributes are set with the information from the ReplicasDBs. The
+ * <code>changeNumber</code> attribute value is set from the content of
+ * ChangeNumberIndexDB.</li>
  * </ul>
  * <h3>Searches flow</h3>
  * <p>
@@ -153,9 +156,8 @@
  * (once, single threaded),</li>
  * <li>
  * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
- * <li>
- * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)}
- * (multiple times, multi threaded)</li>
+ * <li>{@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
+ * threaded)</li>
  * </ol>
  * </li>
  * <li>Persistent searches with <code>changesOnly=true</code> go through:
@@ -163,8 +165,8 @@
  * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
  * (once, single threaded)</li>
  * <li>
- * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)}
- * (multiple times, multi threaded)</li>
+ * {@link ChangelogBackend#notify*EntryAdded()} (multiple times, multi
+ * threaded)</li>
  * </ol>
  * </li>
  * </ul>
@@ -182,6 +184,8 @@
 
   private static final String CHANGE_NUMBER_ATTR = "changeNumber";
   private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
+  private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie";
+  private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
 
   /** The set of objectclasses that will be used in root entry. */
   private static final Map<ObjectClass, String>
@@ -237,6 +241,16 @@
   private final ReplicationServer replicationServer;
   private final ECLEnabledDomainPredicate domainPredicate;
 
+  /** The set of cookie-based persistent searches registered with this backend. */
+  private final ConcurrentLinkedQueue<PersistentSearch> cookieBasedPersistentSearches =
+      new ConcurrentLinkedQueue<PersistentSearch>();
+  /**
+   * The set of change number-based persistent searches registered with this
+   * backend.
+   */
+  private final ConcurrentLinkedQueue<PersistentSearch> changeNumberBasedPersistentSearches =
+      new ConcurrentLinkedQueue<PersistentSearch>();
+
   /**
    * Creates a new backend with the provided replication server.
    *
@@ -406,10 +420,49 @@
   }
 
   /**
-   * Notifies persistent searches of this backend that a new entry was added to it.
+   * Notifies persistent searches of this backend that a new cookie entry was added to it.
    * <p>
    * Note: This method correspond to the "persistent search" phase.
    * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
+   * <p>
+   * This method must only be called after the provided data have been persisted to disk.
+   *
+   * @param baseDN
+   *          the baseDN of the newly added entry.
+   * @param updateMsg
+   *          the update message of the newly added entry
+   * @throws ChangelogException
+   *           If a problem occurs while notifying of the newly added entry.
+   */
+  public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg) throws ChangelogException
+  {
+    if (!(updateMsg instanceof LDAPUpdateMsg))
+    {
+      return;
+    }
+
+    try
+    {
+      for (PersistentSearch pSearch : cookieBasedPersistentSearches)
+      {
+        final SearchOperation searchOp = pSearch.getSearchOperation();
+        final CookieEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
+        entrySender.persistentSearchSendEntry(baseDN, updateMsg);
+      }
+    }
+    catch (DirectoryException e)
+    {
+      throw new ChangelogException(e.getMessageObject(), e);
+    }
+  }
+
+  /**
+   * Notifies persistent searches of this backend that a new change number entry was added to it.
+   * <p>
+   * Note: This method correspond to the "persistent search" phase.
+   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
+   * <p>
+   * This method must only be called after the provided data have been persisted to disk.
    *
    * @param baseDN
    *          the baseDN of the newly added entry.
@@ -425,44 +478,23 @@
    * @throws ChangelogException
    *           If a problem occurs while notifying of the newly added entry.
    */
-  public void notifyEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
+  public void notifyChangeNumberEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
       throws ChangelogException
   {
-    final boolean isCookieEntry = changeNumber <= 0;
-    final List<SearchOperation> pSearchOps = getPersistentSearches(isCookieEntry);
-    if (pSearchOps.isEmpty() || !(updateMsg instanceof LDAPUpdateMsg))
+    if (!(updateMsg instanceof LDAPUpdateMsg))
     {
       return;
     }
 
     try
     {
-      final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
-      for (SearchOperation pSearchOp : pSearchOps)
+      // changeNumber entry can be shared with multiple persistent searches
+      final Entry changeNumberEntry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
+      for (PersistentSearch pSearch : changeNumberBasedPersistentSearches)
       {
-        final MultiDomainServerState cookie = (MultiDomainServerState)
-            pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL);
-
-        // when returning changesOnly, the first incoming update must return
-        // the base entry before any other changes,
-        // so force sending now, when protected by the synchronized block
-        if (isCookieEntry)
-        { // cookie based search
-          final String cookieStr;
-          synchronized (cookie)
-          { // forbid concurrent updates to the cookie
-            cookie.update(baseDN, updateMsg.getCSN());
-            cookieStr = cookie.toString();
-          }
-          final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg);
-          // FIXME JNR use this instead of previous line:
-          // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr));
-          sendEntryIfMatches(pSearchOp, entry2, cookieStr);
-        }
-        else
-        { // draft changeNumber search
-          sendEntryIfMatches(pSearchOp, entry, null);
-        }
+        final SearchOperation searchOp = pSearch.getSearchOperation();
+        final ChangeNumberEntrySender entrySender = searchOp.getAttachment(ENTRY_SENDER_ATTACHMENT);
+        entrySender.persistentSearchSendEntry(changeNumber, changeNumberEntry);
       }
     }
     catch (DirectoryException e)
@@ -471,20 +503,6 @@
     }
   }
 
-  private List<SearchOperation> getPersistentSearches(boolean wantCookieBasedSearch)
-  {
-    final List<SearchOperation> results = new ArrayList<SearchOperation>();
-    for (PersistentSearch pSearch : getPersistentSearches())
-    {
-      final SearchOperation op = pSearch.getSearchOperation();
-      if (wantCookieBasedSearch == isCookieBased(op))
-      {
-        results.add(op);
-      }
-    }
-    return results;
-  }
-
   private boolean isCookieBased(final SearchOperation searchOp)
   {
     for (Control c : searchOp.getRequestControls())
@@ -966,17 +984,22 @@
   {
     validateProvidedCookie(searchParams);
 
-    // send the base changelog entry immediately even for changesOnly=false persistent searches
-    if (!sendBaseChangelogEntry(searchOperation))
-    {
-      // only return the base entry: stop here
-      return;
-    }
-
+    final MultiDomainServerState cookie = searchParams.cookie;
+    final CookieEntrySender entrySender;
     if (isPersistentSearch(searchOperation))
     {
       // communicate the cookie between the "initial search" phase and the "persistent search" phase
-      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, searchParams.cookie);
+      searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie);
+      entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
+    }
+    else
+    {
+      entrySender = new CookieEntrySender(searchOperation);
+    }
+
+    if (!sendBaseChangelogEntry(searchOperation))
+    { // only return the base entry: stop here
+      return;
     }
 
     ECLMultiDomainDBCursor replicaUpdatesCursor = null;
@@ -984,28 +1007,37 @@
     {
       final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
       final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
-          searchParams.cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
+          cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
       replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
 
-      boolean continueSearch = true;
-      while (continueSearch && replicaUpdatesCursor.next())
+      boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
+      if (continueSearch)
       {
-        // Handle the update message
-        final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
-        final DN domainBaseDN = replicaUpdatesCursor.getData();
-        searchParams.cookie.update(domainBaseDN, updateMsg.getCSN());
-        final String cookieString = searchParams.cookie.toString();
-
-        final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg);
-        continueSearch = sendEntryIfMatches(searchOperation, entry, cookieString);
+        entrySender.transitioningToPersistentSearchPhase();
+        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
       }
     }
     finally
     {
+      entrySender.finalizeInitialSearch();
       StaticUtils.close(replicaUpdatesCursor);
     }
   }
 
+  private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
+      ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException,
+      DirectoryException
+  {
+    boolean continueSearch = true;
+    while (continueSearch && replicaUpdatesCursor.next())
+    {
+      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
+      final DN domainBaseDN = replicaUpdatesCursor.getData();
+      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie);
+    }
+    return continueSearch;
+  }
+
   private boolean isPersistentSearch(SearchOperation op)
   {
     for (PersistentSearch pSearch : getPersistentSearches())
@@ -1022,17 +1054,38 @@
   @Override
   public void registerPersistentSearch(PersistentSearch pSearch)
   {
-    final SearchOperation searchOp = pSearch.getSearchOperation();
-    if (pSearch.isChangesOnly())
-    {
-      // this changesOnly persistent search will not go through #search0() down below
-      // so we must initialize the entrySender here and never return the base entry
-      searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, getNewestCookie(searchOp));
-    }
+    initializeAttachements(pSearch);
 
+    if (isCookieBased(pSearch.getSearchOperation()))
+    {
+      cookieBasedPersistentSearches.add(pSearch);
+    }
+    else
+    {
+      changeNumberBasedPersistentSearches.add(pSearch);
+    }
     super.registerPersistentSearch(pSearch);
   }
 
+  private void initializeAttachements(PersistentSearch pSearch)
+  {
+    final SearchOperation searchOp = pSearch.getSearchOperation();
+    if (isCookieBased(searchOp))
+    {
+      if (pSearch.isChangesOnly())
+      {
+        // this changesOnly persistent search will not go through #initialSearch()
+        // so we must initialize the cookie here
+        searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp));
+      }
+      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp));
+    }
+    else
+    {
+      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp));
+    }
+  }
+
   private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
   {
     if (!isCookieBased(searchOp))
@@ -1060,10 +1113,10 @@
    */
   private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
   {
-    final MultiDomainServerState state = searchParams.cookie;
-    if (state != null && !state.isEmpty())
+    final MultiDomainServerState cookie = searchParams.cookie;
+    if (cookie != null && !cookie.isEmpty())
     {
-      replicationServer.validateCookie(state, searchParams.getExcludedBaseDNs());
+      replicationServer.validateCookie(cookie, searchParams.getExcludedBaseDNs());
     }
   }
 
@@ -1076,51 +1129,61 @@
     // "initial search" phase must return the base entry immediately
     sendBaseChangelogEntry(searchOperation);
 
+    final ChangeNumberEntrySender entrySender;
+    if (isPersistentSearch(searchOperation))
+    {
+      entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
+    }
+    else
+    {
+      entrySender = new ChangeNumberEntrySender(searchOperation);
+    }
+
     DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
-    MultiDomainDBCursor replicaUpdatesCursor = null;
+    final AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor = new AtomicReference<MultiDomainDBCursor>();
     try
     {
       cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
-      boolean continueSearch = true;
-      while (continueSearch && cnIndexDBCursor.next())
+      final boolean continueSearch =
+          sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor);
+      if (continueSearch)
       {
-        // Handle the current cnIndex record
-        final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
-        if (replicaUpdatesCursor == null)
-        {
-          replicaUpdatesCursor = initializeReplicaUpdatesCursor(cnIndexRecord);
-        }
-        continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
-        if (continueSearch)
-        {
-          UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
-          if (updateMsg != null)
-          {
-            continueSearch = sendEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
-            replicaUpdatesCursor.next();
-          }
-        }
+        entrySender.transitioningToPersistentSearchPhase();
+        sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor);
       }
     }
     finally
     {
-      StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor);
+      entrySender.finalizeInitialSearch();
+      StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor.get());
     }
   }
 
-  /**
-   * @return {@code true} if search should continue, {@code false} otherwise
-   */
-  private boolean sendEntryForUpdateMessage(SearchOperation searchOperation,
-      ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException
+  private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender,
+      final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor,
+      AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor) throws ChangelogException, DirectoryException
   {
-    final DN baseDN = cnIndexRecord.getBaseDN();
-    final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
-    cookie.update(baseDN, cnIndexRecord.getCSN());
-    final String cookieString = cookie.toString();
-
-    final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
-    return sendEntryIfMatches(searchOperation, entry, null);
+    boolean continueSearch = true;
+    while (continueSearch && cnIndexDBCursor.next())
+    {
+      // Handle the current cnIndex record
+      final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
+      if (replicaUpdatesCursor.get() == null)
+      {
+        replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord));
+      }
+      continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
+      if (continueSearch)
+      {
+        final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
+        if (updateMsg != null)
+        {
+          continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg);
+          replicaUpdatesCursor.get().next();
+        }
+      }
+    }
+    return continueSearch;
   }
 
   private MultiDomainDBCursor initializeReplicaUpdatesCursor(
@@ -1196,8 +1259,8 @@
   /**
    * Creates a changelog entry.
    */
-  private Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
-      throws DirectoryException
+  private static Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie,
+      final UpdateMsg msg) throws DirectoryException
   {
     if (msg instanceof AddMsg)
     {
@@ -1224,7 +1287,7 @@
    * change initiators name if available which is contained in the creatorsName
    * attribute.
    */
-  private Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
+  private static Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
       throws DirectoryException
   {
     final AddMsg addMsg = (AddMsg) msg;
@@ -1265,8 +1328,8 @@
    * out change initiators name if available which is contained in the
    * modifiersName attribute.
    */
-  private Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
-      throws DirectoryException
+  private static Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie,
+      final UpdateMsg msg) throws DirectoryException
   {
     final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
     String changeInitiatorsName = null;
@@ -1330,7 +1393,7 @@
    * @param entryDN
    *            DN of original entry
    */
-  private void logEncodingMessageError(String messageType, DN entryDN,  Exception exception)
+  private static void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
   {
     TRACER.debugCaught(DebugLogLevel.ERROR, exception);
     logError(Message.raw(Category.SYNC, Severity.MILD_ERROR,
@@ -1359,7 +1422,7 @@
     String dnString;
     if (changeNumber > 0)
     {
-      // Draft compat mode
+      // change number mode
       dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
     }
     else
@@ -1439,7 +1502,8 @@
    *
    * @return {@code true} if search should continue, {@code false} otherwise
    */
-  private boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie) throws DirectoryException
+  private static boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie)
+      throws DirectoryException
   {
     if (matchBaseAndScopeAndFilter(searchOp, entry))
     {
@@ -1450,18 +1514,18 @@
   }
 
   /** Indicates if the provided entry matches the filter, base and scope. */
-  private boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
+  private static boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
   {
     return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
         && searchOp.getFilter().matchesEntry(entry);
   }
 
-  private List<Control> getControls(String cookie)
+  private static List<Control> getControls(String cookie)
   {
     if (cookie != null)
     {
-      Control c = new EntryChangelogNotificationControl(true, cookie);
-      return Arrays.asList(c);
+      final Control c = new EntryChangelogNotificationControl(true, cookie);
+      return Collections.singletonList(c);
     }
     return Collections.emptyList();
   }
@@ -1557,4 +1621,218 @@
     }
   }
 
+  /**
+   * Describes the current search phase.
+   */
+  private enum SearchPhase
+  {
+    /**
+     * "Initial search" phase. The "initial search" phase is running
+     * concurrently. All update notifications are ignored.
+     */
+    INITIAL,
+    /**
+     * Transitioning from the "initial search" phase to the "persistent search"
+     * phase. "Initial search" phase has finished reading from the DB. It now
+     * verifies if any more updates have been persisted to the DB since stopping
+     * and send them. All update notifications are blocked.
+     */
+    TRANSITIONING,
+    /**
+     * "Persistent search" phase. "Initial search" phase has completed. All
+     * update notifications are published.
+     */
+    PERSISTENT;
+  }
+
+  /**
+   * Contains data to ensure that the same change is not sent twice to clients
+   * because of race conditions between the "initial search" phase and the
+   * "persistent search" phase.
+   */
+  private static class SendEntryData<K extends Comparable<K>>
+  {
+    private final AtomicReference<SearchPhase> searchPhase = new AtomicReference<SearchPhase>(SearchPhase.INITIAL);
+    private final Object transitioningLock = new Object();
+    private volatile K lastKeySentByInitialSearch;
+
+    private void finalizeInitialSearch()
+    {
+      searchPhase.set(SearchPhase.PERSISTENT);
+      synchronized (transitioningLock)
+      { // initial search phase has completed, release all persistent searches
+        transitioningLock.notifyAll();
+      }
+    }
+
+    public void transitioningToPersistentSearchPhase()
+    {
+      searchPhase.set(SearchPhase.TRANSITIONING);
+    }
+
+    private void initialSearchSendsEntry(final K key)
+    {
+      lastKeySentByInitialSearch = key;
+    }
+
+    private boolean persistentSearchCanSendEntry(K key)
+    {
+      final SearchPhase stateValue = searchPhase.get();
+      switch (stateValue)
+      {
+      case INITIAL:
+        return false;
+      case TRANSITIONING:
+        synchronized (transitioningLock)
+        {
+          while (SearchPhase.TRANSITIONING.equals(searchPhase.get()))
+          {
+            // "initial search" phase is over, and is now verifying whether new
+            // changes have been published to the DB.
+            // Wait for this check to complete
+            try
+            {
+              transitioningLock.wait();
+            }
+            catch (InterruptedException e)
+            {
+              Thread.currentThread().interrupt();
+              // Shutdown must have been called. Stop sending entries.
+              return false;
+            }
+          }
+        }
+        return key.compareTo(lastKeySentByInitialSearch) > 0;
+      case PERSISTENT:
+        return true;
+      default:
+        throw new RuntimeException("Not implemented for " + stateValue);
+      }
+    }
+  }
+
+  /** Sends entries to clients for change number searches. */
+  private static class ChangeNumberEntrySender
+  {
+    private final SearchOperation searchOp;
+    private final SendEntryData<Long> sendEntryData = new SendEntryData<Long>();
+
+    private ChangeNumberEntrySender(SearchOperation searchOp)
+    {
+      this.searchOp = searchOp;
+    }
+
+    private void finalizeInitialSearch()
+    {
+      sendEntryData.finalizeInitialSearch();
+    }
+
+    public void transitioningToPersistentSearchPhase()
+    {
+      sendEntryData.transitioningToPersistentSearchPhase();
+    }
+
+    /**
+     * @return {@code true} if search should continue, {@code false} otherwise
+     */
+    private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg)
+        throws DirectoryException
+    {
+      final DN baseDN = cnIndexRecord.getBaseDN();
+      final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
+      cookie.update(baseDN, cnIndexRecord.getCSN());
+      final String cookieString = cookie.toString();
+
+      sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber());
+      final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
+      return sendEntryIfMatches(searchOp, entry, null);
+    }
+
+    private void persistentSearchSendEntry(long changeNumber, Entry entry) throws DirectoryException
+    {
+      if (sendEntryData.persistentSearchCanSendEntry(changeNumber))
+      {
+        sendEntryIfMatches(searchOp, entry, null);
+      }
+    }
+  }
+
+  /** Sends entries to clients for cookie-based searches. */
+  private static class CookieEntrySender {
+    private final SearchOperation searchOp;
+    private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
+        new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
+
+    private CookieEntrySender(SearchOperation searchOp)
+    {
+      this.searchOp = searchOp;
+    }
+
+    public void finalizeInitialSearch()
+    {
+      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
+      {
+        sendEntryData.finalizeInitialSearch();
+      }
+    }
+
+    public void transitioningToPersistentSearchPhase()
+    {
+      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
+      {
+        sendEntryData.transitioningToPersistentSearchPhase();
+      }
+    }
+
+    private SendEntryData<CSN> getSendEntryData(DN baseDN, CSN csn)
+    {
+      final Pair<DN, Integer> replicaId = Pair.of(baseDN, csn.getServerId());
+      SendEntryData<CSN> data = replicaIdToSendEntryData.get(replicaId);
+      if (data == null)
+      {
+        final SendEntryData<CSN> newData = new SendEntryData<CSN>();
+        data = replicaIdToSendEntryData.putIfAbsent(replicaId, newData);
+        return data == null ? newData : data;
+      }
+      return data;
+    }
+
+    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN,
+        final MultiDomainServerState cookie) throws DirectoryException
+    {
+      final CSN csn = updateMsg.getCSN();
+      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
+      sendEntryData.initialSearchSendsEntry(csn);
+      final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
+      final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
+      return sendEntryIfMatches(searchOp, entry, cookieString);
+    }
+
+    private void persistentSearchSendEntry(DN baseDN, UpdateMsg updateMsg)
+        throws DirectoryException
+    {
+      final CSN csn = updateMsg.getCSN();
+      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
+      if (sendEntryData.persistentSearchCanSendEntry(csn))
+      {
+        // multi threaded case: wait for the "initial search" phase to set the cookie
+        final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT);
+
+        final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
+        final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
+        // FIXME JNR use this instead of previous line:
+        // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
+        sendEntryIfMatches(searchOp, cookieEntry, cookieString);
+      }
+    }
+
+    private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn)
+    {
+      synchronized (cookie)
+      { // forbid concurrent updates to the cookie
+        cookie.update(baseDN, csn);
+        return cookie.toString();
+      }
+    }
+  }
 }

--
Gitblit v1.10.0