From 26976832d0dc62a838dce055ca8b9e0cc5fb272a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 04 Sep 2014 14:14:08 +0000
Subject: [PATCH] OPENDJ-1540 (CR-4436) Persistent searches on cn=changelog with changesOnly=true should not return the base changelog entry

---
 opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java |  507 ++++++++++++++++++++++++++++----------------------------
 1 files changed, 255 insertions(+), 252 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java b/opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
index 9d7613f..d743e64 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -25,20 +25,19 @@
  */
 package org.opends.server.backends;
 
-import static org.opends.messages.BackendMessages.*;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.config.ConfigConstants.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.plugin.MultimasterReplication.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
-import static org.opends.server.util.LDIFWriter.*;
-import static org.opends.server.util.ServerConstants.*;
-import static org.opends.server.util.StaticUtils.*;
-
 import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimeZone;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -49,7 +48,13 @@
 import org.opends.server.config.ConfigException;
 import org.opends.server.controls.EntryChangelogNotificationControl;
 import org.opends.server.controls.ExternalChangelogRequestControl;
-import org.opends.server.core.*;
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.ModifyDNOperation;
+import org.opends.server.core.ModifyOperation;
+import org.opends.server.core.PersistentSearch;
+import org.opends.server.core.SearchOperation;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
@@ -71,10 +76,51 @@
 import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
 import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor;
 import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
-import org.opends.server.types.*;
-import org.opends.server.util.ServerConstants;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.BackupConfig;
+import org.opends.server.types.BackupDirectory;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.CanceledOperationException;
+import org.opends.server.types.ConditionResult;
+import org.opends.server.types.Control;
+import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryConfig;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.FilterType;
+import org.opends.server.types.IndexType;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.LDIFExportConfig;
+import org.opends.server.types.LDIFImportConfig;
+import org.opends.server.types.LDIFImportResult;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.ObjectClass;
+import org.opends.server.types.Privilege;
+import org.opends.server.types.RDN;
+import org.opends.server.types.RawAttribute;
+import org.opends.server.types.RestoreConfig;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchFilter;
+import org.opends.server.types.SearchScope;
+import org.opends.server.types.WritabilityMode;
 import org.opends.server.util.StaticUtils;
 
+import static org.opends.messages.BackendMessages.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.plugin.MultimasterReplication.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.LDIFWriter.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.*;
+
 /**
  * A backend that provides access to the changelog, i.e. the "cn=changelog"
  * suffix. It is a read-only backend that is created by a
@@ -92,6 +138,36 @@
  * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
  * from the content of ChangeNumberIndexDB.</li>
  * </ul>
+ * <h3>Searches flow</h3>
+ * <p>
+ * Here is the flow of searches within the changelog backend APIs:
+ * <ul>
+ * <li>Normal searches only go through:
+ * <ol>
+ * <li>{@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
+ * </ol>
+ * </li>
+ * <li>Persistent searches with <code>changesOnly=false</code> go through:
+ * <ol>
+ * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
+ * (once, single threaded),</li>
+ * <li>
+ * {@link ChangelogBackend#search(SearchOperation)} (once, single threaded)</li>
+ * <li>
+ * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)}
+ * (multiple times, multi threaded)</li>
+ * </ol>
+ * </li>
+ * <li>Persistent searches with <code>changesOnly=true</code> go through:
+ * <ol>
+ * <li>{@link ChangelogBackend#registerPersistentSearch(PersistentSearch)}
+ * (once, single threaded)</li>
+ * <li>
+ * {@link ChangelogBackend#notifyEntryAdded(DN, long, String, UpdateMsg)}
+ * (multiple times, multi threaded)</li>
+ * </ol>
+ * </li>
+ * </ul>
  *
  * @see ReplicationServer
  */
@@ -152,13 +228,13 @@
 
   /** The set of base DNs for this backend. */
   private DN[] baseDNs;
-
   /** The set of supported controls for this backend. */
   private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
+  /** Whether the base changelog entry has subordinates. */
+  private Boolean baseEntryHasSubordinates;
 
   /** The replication server on which the changelog is read. */
   private final ReplicationServer replicationServer;
-
   private final ECLEnabledDomainPredicate domainPredicate;
 
   /**
@@ -215,7 +291,7 @@
     catch (final DirectoryException e)
     {
       throw new InitializationException(
-          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(CHANGELOG_BASE_DN.toString(), getExceptionMessage(e)), e);
+          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(DN_EXTERNAL_CHANGELOG_ROOT, getExceptionMessage(e)), e);
     }
   }
 
@@ -280,101 +356,60 @@
 
   /** {@inheritDoc} */
   @Override
-  public ConditionResult hasSubordinates(final DN entryDN)
-      throws DirectoryException
+  public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException
   {
-    final long num = numSubordinates(entryDN, false);
-    if (num < 0)
+    if (CHANGELOG_BASE_DN.equals(entryDN))
     {
-      return ConditionResult.UNDEFINED;
+      final Boolean hasSubs = baseChangelogHasSubordinates();
+      if (hasSubs == null)
+      {
+        return ConditionResult.UNDEFINED;
+      }
+      return hasSubs ? ConditionResult.TRUE : ConditionResult.FALSE;
     }
-    else if (num == 0)
-    {
-      return ConditionResult.FALSE;
-    }
-    else
-    {
-      return ConditionResult.TRUE;
-    }
+    return ConditionResult.FALSE;
   }
 
-  /** Specific search operation to count number of entries. */
-  private final class NumSubordinatesSearchOperation extends SearchOperationWrapper
+  private Boolean baseChangelogHasSubordinates() throws DirectoryException
   {
-    private long numSubordinates = -1;
-
-    private NumSubordinatesSearchOperation()
+    if (baseEntryHasSubordinates == null)
     {
-      super(null);
+      // compute its value
+      try
+      {
+        final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
+        final MultiDomainDBCursor cursor =
+            replicationDomainDB.getCursorFrom(null, ON_MATCHING_KEY, getExcludedBaseDNs());
+        try
+        {
+          baseEntryHasSubordinates = cursor.next();
+        }
+        finally
+        {
+          close(cursor);
+        }
+      }
+      catch (ChangelogException e)
+      {
+        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_ATTRIBUTE.get(
+            "hasSubordinates", DN_EXTERNAL_CHANGELOG_ROOT, stackTraceToSingleLineString(e)));
+      }
     }
-
-    @Override
-    public boolean returnEntry(Entry entry, List<Control> controls)
-    {
-      numSubordinates++;
-      return true;
-    }
-
-    @Override
-    public DN getBaseDN()
-    {
-      return CHANGELOG_BASE_DN;
-    }
-
-    @Override
-    public SearchFilter getFilter()
-    {
-      return LDAPURL.DEFAULT_SEARCH_FILTER;
-    }
-
-    @Override
-    public SearchScope getScope()
-    {
-      return SearchScope.WHOLE_SUBTREE;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Object setAttachment(String name, Object value)
-    {
-      return null;
-    }
+    return baseEntryHasSubordinates;
   }
 
   /** {@inheritDoc} */
   @Override
   public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException
   {
-    // Compute the num subordinates only for the base DN
-    if (entryDN == null || !CHANGELOG_BASE_DN.equals(entryDN))
-    {
-      return -1;
-    }
-    if (!subtree)
-    {
-      return 1;
-    }
-
-    // Search with cookie mode to count all update messages cross replica
-    final SearchParams params = new SearchParams(getExcludedChangelogDomains());
-    params.cookie = new MultiDomainServerState();
-    try
-    {
-      final NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation();
-      search0(params, searchOp);
-      return searchOp.numSubordinates;
-    }
-    catch (ChangelogException e)
-    {
-      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get(
-          CHANGELOG_BASE_DN.toString(), stackTraceToSingleLineString(e)));
-    }
+    return -1;
   }
 
   /**
    * Notifies persistent searches of this backend that a new entry was added to it.
    * <p>
-   * Note: This method is called in a multi-threaded context.
+   * Note: This method correspond to the "persistent search" phase.
+   * It is executed multiple times per persistent search, multi-threaded, until the persistent search is cancelled.
    *
    * @param baseDN
    *          the baseDN of the newly added entry.
@@ -405,7 +440,7 @@
       final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
       for (SearchOperation pSearchOp : pSearchOps)
       {
-        final EntrySender entrySender = (EntrySender)
+        final MultiDomainServerState cookie = (MultiDomainServerState)
             pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL);
 
         // when returning changesOnly, the first incoming update must return
@@ -414,28 +449,19 @@
         if (isCookieEntry)
         { // cookie based search
           final String cookieStr;
-          synchronized (entrySender)
+          synchronized (cookie)
           { // forbid concurrent updates to the cookie
-            entrySender.cookie.update(baseDN, updateMsg.getCSN());
-            cookieStr = entrySender.cookie.toString();
-
-            entrySender.sendBaseChangelogEntry(true);
+            cookie.update(baseDN, updateMsg.getCSN());
+            cookieStr = cookie.toString();
           }
           final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg);
           // FIXME JNR use this instead of previous line:
           // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr));
-          entrySender.sendEntryIfMatches(entry2, cookieStr);
+          sendEntryIfMatches(pSearchOp, entry2, cookieStr);
         }
         else
         { // draft changeNumber search
-          if (!entrySender.hasReturnedBaseEntry.get())
-          {
-            synchronized (entrySender)
-            {
-              entrySender.sendBaseChangelogEntry(true);
-            }
-          }
-          entrySender.sendEntryIfMatches(entry, null);
+          sendEntryIfMatches(pSearchOp, entry, null);
         }
       }
     }
@@ -520,7 +546,7 @@
     optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
     try
     {
-      search0(params, searchOperation);
+      initialSearch(params, searchOperation);
     }
     catch (ChangelogException e)
     {
@@ -533,7 +559,7 @@
 
   private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException
   {
-    final SearchParams params = new SearchParams(getExcludedChangelogDomains());
+    final SearchParams params = new SearchParams(getExcludedBaseDNs());
     final ExternalChangelogRequestControl eclRequestControl =
         searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
     if (eclRequestControl != null)
@@ -659,7 +685,7 @@
    */
   static class SearchParams
   {
-    private final Set<String> excludedBaseDNs;
+    private final Set<DN> excludedBaseDNs;
     private long lowestChangeNumber = -1;
     private long highestChangeNumber = -1;
     private CSN csn = new CSN(0, 0, 0);
@@ -670,7 +696,7 @@
      */
     SearchParams()
     {
-      this(Collections.<String> emptySet());
+      this(Collections.<DN> emptySet());
     }
 
     /**
@@ -679,7 +705,7 @@
      * @param excludedBaseDNs
      *          Set of DNs to exclude from search.
      */
-    SearchParams(final Set<String> excludedBaseDNs)
+    SearchParams(final Set<DN> excludedBaseDNs)
     {
       this.excludedBaseDNs = excludedBaseDNs;
     }
@@ -746,16 +772,27 @@
      * @throws DirectoryException
      *           If a DN can't be decoded.
      */
-    Set<DN> getExcludedBaseDNs() throws DirectoryException
+    Set<DN> getExcludedBaseDNs()
     {
-      final Set<DN> excludedDNs = new HashSet<DN>();
-      for (String dn : excludedBaseDNs)
-      {
-        excludedDNs.add(DN.decode(dn));
-      }
-      return excludedDNs;
+      return excludedBaseDNs;
     }
+  }
 
+  /**
+   * Returns the set of DNs to exclude from the search.
+   *
+   * @return the DNs corresponding to domains to exclude from the search.
+   * @throws DirectoryException
+   *           If a DN can't be decoded.
+   */
+  private static Set<DN> getExcludedBaseDNs() throws DirectoryException
+  {
+    final Set<DN> excludedDNs = new HashSet<DN>();
+    for (String dn : getExcludedChangelogDomains())
+    {
+      excludedDNs.add(DN.decode(dn));
+    }
+    return excludedDNs;
   }
 
   /**
@@ -902,32 +939,46 @@
            && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName);
   }
 
-  private void search0(final SearchParams searchParams, final SearchOperation searchOperation)
+  /**
+   * Runs the "initial search" phase (as opposed to a "persistent search" phase).
+   * The "initial search" phase is the only search run by normal searches,
+   * but it is also run by persistent searches with <code>changesOnly=false</code>.
+   * Persistent searches with <code>changesOnly=true</code> never execute this code.
+   * <p>
+   * Note: this method is executed only once per persistent search, single threaded.
+   */
+  private void initialSearch(final SearchParams searchParams, final SearchOperation searchOperation)
       throws DirectoryException, ChangelogException
   {
     if (searchParams.isCookieBasedSearch())
     {
-      searchFromCookie(searchParams, searchOperation);
+      initialSearchFromCookie(searchParams, searchOperation);
     }
     else
     {
-      searchFromChangeNumber(searchParams, searchOperation);
+      initialSearchFromChangeNumber(searchParams, searchOperation);
     }
   }
 
   /**
    * Search the changelog when a cookie control is provided.
    */
-  private void searchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
+  private void initialSearchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
       throws DirectoryException, ChangelogException
   {
     validateProvidedCookie(searchParams);
-    final boolean isPersistentSearch = isPersistentSearch(searchOperation);
 
-    final EntrySender entrySender = new EntrySender(searchOperation, searchParams.cookie);
-    if (isPersistentSearch)
+    // send the base changelog entry immediately even for changesOnly=false persistent searches
+    if (!sendBaseChangelogEntry(searchOperation))
     {
-      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender);
+      // only return the base entry: stop here
+      return;
+    }
+
+    if (isPersistentSearch(searchOperation))
+    {
+      // communicate the cookie between the "initial search" phase and the "persistent search" phase
+      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, searchParams.cookie);
     }
 
     ECLMultiDomainDBCursor replicaUpdatesCursor = null;
@@ -948,13 +999,7 @@
         final String cookieString = searchParams.cookie.toString();
 
         final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg);
-        continueSearch = entrySender.sendEntryIfMatches(entry, cookieString);
-      }
-
-      if (!isPersistentSearch)
-      {
-        // send the base changelog entry if no update message is found
-        entrySender.sendBaseChangelogEntry(false);
+        continueSearch = sendEntryIfMatches(searchOperation, entry, cookieString);
       }
     }
     finally
@@ -979,16 +1024,15 @@
   @Override
   public void registerPersistentSearch(PersistentSearch pSearch)
   {
-    super.registerPersistentSearch(pSearch);
-
     final SearchOperation searchOp = pSearch.getSearchOperation();
     if (pSearch.isChangesOnly())
     {
-      // this persistent search will not go through #search0() down below
-      // so we must initialize the cookie here
-      searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL,
-          new EntrySender(searchOp, getNewestCookie(searchOp)));
+      // this changesOnly persistent search will not go through #search0() down below
+      // so we must initialize the entrySender here and never return the base entry
+      searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, getNewestCookie(searchOp));
     }
+
+    super.registerPersistentSearch(pSearch);
   }
 
   private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
@@ -1028,15 +1072,11 @@
   /**
    * Search the changelog using change number(s).
    */
-  private void searchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
+  private void initialSearchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
       throws ChangelogException, DirectoryException
   {
-    final EntrySender entrySender = new EntrySender(searchOperation, null);
-    final boolean isPersistentSearch = isPersistentSearch(searchOperation);
-    if (isPersistentSearch)
-    {
-      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender);
-    }
+    // "initial search" phase must return the base entry immediately
+    sendBaseChangelogEntry(searchOperation);
 
     DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
     MultiDomainDBCursor replicaUpdatesCursor = null;
@@ -1058,17 +1098,11 @@
           UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
           if (updateMsg != null)
           {
-            continueSearch = sendEntryForUpdateMessage(entrySender, cnIndexRecord, updateMsg);
+            continueSearch = sendEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
             replicaUpdatesCursor.next();
           }
         }
       }
-
-      if (!isPersistentSearch)
-      {
-        // send the base changelog entry if no update message is found
-        entrySender.sendBaseChangelogEntry(false);
-      }
     }
     finally
     {
@@ -1079,7 +1113,7 @@
   /**
    * @return {@code true} if search should continue, {@code false} otherwise
    */
-  private boolean sendEntryForUpdateMessage(EntrySender entrySender,
+  private boolean sendEntryForUpdateMessage(SearchOperation searchOperation,
       ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException
   {
     final DN baseDN = cnIndexRecord.getBaseDN();
@@ -1088,7 +1122,7 @@
     final String cookieString = cookie.toString();
 
     final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
-    return entrySender.sendEntryIfMatches(entry, null);
+    return sendEntryIfMatches(searchOperation, entry, null);
   }
 
   private MultiDomainDBCursor initializeReplicaUpdatesCursor(
@@ -1402,113 +1436,82 @@
   }
 
   /**
-   * Used to send entries to searches on cn=changelog. This class ensures the
-   * base changelog entry is sent before sending any other entry. It is also
-   * used as a store when going from the "initial search" phase to the
-   * "persistent search" phase.
+   * Sends the entry if it matches the base, scope and filter of the current search operation.
+   * It will also send the base changelog entry if it needs to be sent and was not sent before.
+   *
+   * @return {@code true} if search should continue, {@code false} otherwise
    */
-  private static class EntrySender
+  private boolean sendEntryIfMatches(SearchOperation searchOp, Entry entry, String cookie) throws DirectoryException
   {
-
-    private final SearchOperation searchOp;
-    /**
-     * Used by the cookie-based searches to communicate the cookie between the
-     * initial search phase and the persistent search phase. This is unused with
-     * draft change number searches.
-     */
-    private final MultiDomainServerState cookie;
-    private final AtomicBoolean hasReturnedBaseEntry = new AtomicBoolean();
-
-    public EntrySender(SearchOperation searchOp, MultiDomainServerState cookie)
+    if (matchBaseAndScopeAndFilter(searchOp, entry))
     {
-      this.searchOp = searchOp;
-      this.cookie = cookie;
+      return searchOp.returnEntry(entry, getControls(cookie));
     }
+    // maybe the next entry will match?
+    return true;
+  }
 
-    /**
-     * Sends the entry if it matches the base, scope and filter of the current search operation.
-     * It will also send the base changelog entry if it needs to be sent and was not sent before.
-     *
-     * @return {@code true} if search should continue, {@code false} otherwise
-     */
-    private boolean sendEntryIfMatches(Entry entry, String cookie) throws DirectoryException
+  /** Indicates if the provided entry matches the filter, base and scope. */
+  private boolean matchBaseAndScopeAndFilter(SearchOperation searchOp, Entry entry) throws DirectoryException
+  {
+    return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
+        && searchOp.getFilter().matchesEntry(entry);
+  }
+
+  private List<Control> getControls(String cookie)
+  {
+    if (cookie != null)
     {
-      // About to send one entry: ensure the base changelog entry is sent first
-      if (!sendBaseChangelogEntry(true))
+      Control c = new EntryChangelogNotificationControl(true, cookie);
+      return Arrays.asList(c);
+    }
+    return Collections.emptyList();
+  }
+
+  /**
+   * Create and returns the base changelog entry to the underlying search operation.
+   *
+   * @return {@code true} if search should continue, {@code false} otherwise
+   */
+  private boolean sendBaseChangelogEntry(SearchOperation searchOp) throws DirectoryException
+  {
+    final DN baseDN = searchOp.getBaseDN();
+    final SearchFilter filter = searchOp.getFilter();
+    final SearchScope scope = searchOp.getScope();
+
+    if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
+    {
+      final Entry entry = buildBaseChangelogEntry();
+      if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
       {
-        // only return the base entry: stop here
+        // Abandon, size limit reached.
         return false;
       }
-      if (matchBaseAndScopeAndFilter(entry))
-      {
-        return searchOp.returnEntry(entry, getControls(cookie));
-      }
-      // maybe the next entry will match?
-      return true;
     }
+    return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
+        || !scope.equals(SearchScope.BASE_OBJECT);
+  }
 
-    /** Indicates if the provided entry matches the filter, base and scope. */
-    private boolean matchBaseAndScopeAndFilter(Entry entry) throws DirectoryException
-    {
-      return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
-          && searchOp.getFilter().matchesEntry(entry);
-    }
+  private Entry buildBaseChangelogEntry() throws DirectoryException
+  {
+    final String hasSubordinatesStr = Boolean.toString(baseChangelogHasSubordinates());
 
-    private List<Control> getControls(String cookie)
-    {
-      if (cookie != null)
-      {
-        Control c = new EntryChangelogNotificationControl(true, cookie);
-        return Arrays.asList(c);
-      }
-      return Collections.emptyList();
-    }
+    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
+    final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
 
-    /**
-     * Create and returns the base changelog entry to the underlying search operation.
-     *
-     * @return {@code true} if search should continue, {@code false} otherwise
-     */
-    private boolean sendBaseChangelogEntry(boolean hasSubordinates) throws DirectoryException
-    {
-      if (hasReturnedBaseEntry.compareAndSet(false, true))
-      {
-        final DN baseDN = searchOp.getBaseDN();
-        final SearchFilter filter = searchOp.getFilter();
-        final SearchScope scope = searchOp.getScope();
+    // We never return the numSubordinates attribute for the base changelog entry
+    // and there is a very good reason for that:
+    // - Either we compute it before sending the entries,
+    // -- then we risk returning more entries if new entries come in after we computed numSubordinates
+    // --   or we risk returning less entries if purge kicks in      after we computed numSubordinates
+    // - Or we accumulate all the entries that must be returned before sending them => OutOfMemoryError
 
-        if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
-        {
-          final Entry entry = buildBaseChangelogEntry(hasSubordinates);
-          if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
-          {
-            // Abandon, size limit reached.
-            return false;
-          }
-        }
-        return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
-            || !scope.equals(SearchScope.BASE_OBJECT);
-      }
-      return true;
-    }
-
-    private Entry buildBaseChangelogEntry(boolean hasSubordinates)
-    {
-      final Map<AttributeType, List<Attribute>> userAttrs =
-          new LinkedHashMap<AttributeType, List<Attribute>>();
-      final Map<AttributeType, List<Attribute>> operationalAttrs =
-          new LinkedHashMap<AttributeType, List<Attribute>>();
-
-      addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME,
-          ChangelogBackend.BACKEND_ID, userAttrs, operationalAttrs);
-      addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
-          ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
-      addAttributeByUppercaseName("hassubordinates", "hasSubordinates",
-          Boolean.toString(hasSubordinates), userAttrs, operationalAttrs);
-      addAttributeByUppercaseName("entrydn", "entryDN",
-          ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
-      return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
-    }
+    addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
+    addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
+        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
+    addAttributeByUppercaseName("hassubordinates", "hasSubordinates", hasSubordinatesStr, userAttrs, operationalAttrs);
+    addAttributeByUppercaseName("entrydn", "entryDN", DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
+    return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
   }
 
   private static void addAttribute(final Entry e, final String attrType, final String attrValue)

--
Gitblit v1.10.0