From 02bbeacbfb05101989dac510cbef7815fdf28a2e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 01 Sep 2014 12:51:46 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4393) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog

---
 opends/src/server/org/opends/server/backends/ChangelogBackend.java |  569 +++++++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 394 insertions(+), 175 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/ChangelogBackend.java b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
index 0f0da2a..6808a0a 100644
--- a/opends/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -29,8 +29,7 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.config.ConfigConstants.*;
 import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
 import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
 import static org.opends.server.util.LDIFWriter.*;
@@ -39,6 +38,7 @@
 
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -53,6 +53,7 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.AddMsg;
 import org.opends.server.replication.protocol.DeleteMsg;
@@ -62,6 +63,7 @@
 import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
@@ -72,10 +74,11 @@
 import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor;
 import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
 import org.opends.server.types.*;
+import org.opends.server.util.ServerConstants;
 import org.opends.server.util.StaticUtils;
 
 /**
- * A backend that provides access to the changelog, ie the "cn=changelog"
+ * A backend that provides access to the changelog, i.e. the "cn=changelog"
  * suffix. It is a read-only backend that is created by a
  * {@code ReplicationServer} and is not configurable.
  * <p>
@@ -85,8 +88,8 @@
  * request. The cookie provided in the control is used to retrieve entries from
  * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
  * the entries.</li>
- * <li>Draft compat mode: when no "ECL Cookie Exchange Control" is provided with
- * the request. The entries are retrieved using the ChangeNumberIndexDB (or
+ * <li>Draft compatibility mode: when no "ECL Cookie Exchange Control" is provided
+ * with the request. The entries are retrieved using the ChangeNumberIndexDB (or
  * DraftDB, hence the name) and their attributes are set with the information
  * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
  * from the content of ChangeNumberIndexDB.</li>
@@ -134,8 +137,20 @@
   private static final AttributeType MODIFIERS_NAME_TYPE =
       DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true);
 
-  /** The DN for the base changelog entry. */
-  private DN baseChangelogDN;
+  /** The base DN for the external change log. */
+  public static final DN CHANGELOG_BASE_DN;
+
+  static
+  {
+    try
+    {
+      CHANGELOG_BASE_DN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
+    }
+    catch (DirectoryException e)
+    {
+      throw new RuntimeException(e);
+    }
+  }
 
   /** The set of base DNs for this backend. */
   private DN[] baseDNs;
@@ -149,7 +164,7 @@
   private final ECLEnabledDomainPredicate domainPredicate;
 
   /**
-   * Creates a new backend with the provided repication server.
+   * Creates a new backend with the provided replication server.
    *
    * @param replicationServer
    *          The replication server on which the changes are read.
@@ -165,6 +180,23 @@
     setPrivateBackend(true);
   }
 
+  private ChangelogDB getChangelogDB()
+  {
+    return replicationServer.getChangelogDB();
+  }
+
+  /**
+   * Returns the ChangelogBackend configured for "cn=changelog" in this directory server.
+   *
+   * @return the ChangelogBackend configured for "cn=changelog" in this directory server
+   * @deprecated instead inject the required object where needed
+   */
+  @Deprecated
+  public static ChangelogBackend getInstance()
+  {
+    return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN);
+  }
+
   /** {@inheritDoc} */
   @Override
   public void configureBackend(final Configuration config) throws ConfigException
@@ -176,29 +208,16 @@
   @Override
   public void initializeBackend() throws InitializationException
   {
-    try
-    {
-      baseChangelogDN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
-      baseDNs = new DN[] { baseChangelogDN };
-    }
-    catch (final DirectoryException e)
-    {
-      if (debugEnabled())
-      {
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      }
-      throw new InitializationException(
-          ERR_BACKEND_CANNOT_DECODE_BACKEND_ROOT_DN.get(getBackendID(), getExceptionMessage(e)), e);
-    }
+    baseDNs = new DN[] { CHANGELOG_BASE_DN };
 
     try
     {
-      DirectoryServer.registerBaseDN(baseChangelogDN, this, true);
+      DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true);
     }
     catch (final DirectoryException e)
     {
       throw new InitializationException(
-          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(baseChangelogDN.toString(), getExceptionMessage(e)), e);
+          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(CHANGELOG_BASE_DN.toString(), getExceptionMessage(e)), e);
     }
   }
 
@@ -206,9 +225,11 @@
   @Override
   public void finalizeBackend()
   {
+    super.finalizeBackend();
+
     try
     {
-      DirectoryServer.deregisterBaseDN(baseChangelogDN);
+      DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN);
     }
     catch (final DirectoryException e)
     {
@@ -299,7 +320,7 @@
     @Override
     public DN getBaseDN()
     {
-      return baseChangelogDN;
+      return CHANGELOG_BASE_DN;
     }
 
     @Override
@@ -313,6 +334,13 @@
     {
       return SearchScope.WHOLE_SUBTREE;
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public Object setAttachment(String name, Object value)
+    {
+      return null;
+    }
   }
 
   /** {@inheritDoc} */
@@ -320,7 +348,7 @@
   public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException
   {
     // Compute the num subordinates only for the base DN
-    if (entryDN == null || !baseChangelogDN.equals(entryDN))
+    if (entryDN == null || !CHANGELOG_BASE_DN.equals(entryDN))
     {
       return -1;
     }
@@ -329,11 +357,9 @@
       return 1;
     }
     // Search with cookie mode to count all update messages
-    final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
-    excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
-    SearchParams params = new SearchParams("0", excludedDomains);
+    final SearchParams params = new SearchParams(getExcludedDomains());
     params.requestType = REQUEST_TYPE_FROM_COOKIE;
-    params.multiDomainServerState = new MultiDomainServerState();
+    params.cookie = new MultiDomainServerState();
     NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation();
     try
     {
@@ -342,11 +368,118 @@
     catch (ChangelogException e)
     {
       throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get(
-          baseChangelogDN.toString(), stackTraceToSingleLineString(e)));
+          CHANGELOG_BASE_DN.toString(), stackTraceToSingleLineString(e)));
     }
     return searchOp.numSubordinates;
   }
 
+  private Set<String> getExcludedDomains()
+  {
+    final Set<String> domains = MultimasterReplication.getECLDisabledDomains();
+    domains.add(DN_EXTERNAL_CHANGELOG_ROOT);
+    return domains;
+  }
+
+  /**
+   * Notifies persistent searches of this backend that a new entry was added to it.
+   * <p>
+   * Note: This method is called in a multi-threaded context.
+   *
+   * @param baseDN
+   *          the baseDN of the newly added entry.
+   * @param changeNumber
+   *          the change number of the newly added entry. It will be greater
+   *          than zero for entries added to the change number index and less
+   *          than or equal to zero for entries added to any replica DB
+   * @param cookieString
+   *          a string representing the cookie of the newly added entry.
+   *          This is only meaningful for entries added to the change number index
+   * @param updateMsg
+   *          the update message of the newly added entry
+   * @throws ChangelogException
+   *           If a problem occurs while notifying of the newly added entry.
+   */
+  public void notifyEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg)
+      throws ChangelogException
+  {
+    final boolean isCookieEntry = changeNumber <= 0;
+    final List<SearchOperation> pSearchOps = getPersistentSearches(isCookieEntry);
+    if (pSearchOps.isEmpty() || !(updateMsg instanceof LDAPUpdateMsg))
+    {
+      return;
+    }
+
+    try
+    {
+      final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg);
+      for (SearchOperation pSearchOp : pSearchOps)
+      {
+        final EntrySender entrySender = (EntrySender)
+            pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL);
+
+        // when returning changesOnly, the first incoming update must return
+        // the base entry before any other changes,
+        // so force sending now, when protected by the synchronized block
+        if (isCookieEntry)
+        { // cookie based search
+          final String cookieStr;
+          synchronized (entrySender)
+          { // forbid concurrent updates to the cookie
+            entrySender.cookie.update(baseDN, updateMsg.getCSN());
+            cookieStr = entrySender.cookie.toString();
+
+            entrySender.sendBaseChangelogEntry(true);
+          }
+          final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg);
+          // FIXME JNR use this instead of previous line:
+          // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr));
+          entrySender.sendEntryIfMatches(entry2, cookieStr);
+        }
+        else
+        { // draft changeNumber search
+          if (!entrySender.hasReturnedBaseEntry.get())
+          {
+            synchronized (entrySender)
+            {
+              entrySender.sendBaseChangelogEntry(true);
+            }
+          }
+          entrySender.sendEntryIfMatches(entry, null);
+        }
+      }
+    }
+    catch (DirectoryException e)
+    {
+      throw new ChangelogException(e.getMessageObject(), e);
+    }
+  }
+
+  private List<SearchOperation> getPersistentSearches(boolean wantCookieBasedSearch)
+  {
+    final List<SearchOperation> results = new ArrayList<SearchOperation>();
+    for (PersistentSearch pSearch : getPersistentSearches())
+    {
+      final SearchOperation op = pSearch.getSearchOperation();
+      if (wantCookieBasedSearch == isCookieBased(op))
+      {
+        results.add(op);
+      }
+    }
+    return results;
+  }
+
+  private boolean isCookieBased(final SearchOperation searchOp)
+  {
+    for (Control c : searchOp.getRequestControls())
+    {
+      if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID()))
+      {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /** {@inheritDoc} */
   @Override
   public void addEntry(Entry entry, AddOperation addOperation)
@@ -409,9 +542,7 @@
 
   private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException
   {
-    final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
-    excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
-    final SearchParams params = new SearchParams(searchOperation.toString(), excludedDomains);
+    final SearchParams params = new SearchParams(getExcludedDomains());
     final ExternalChangelogRequestControl eclRequestControl =
         searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
     if (eclRequestControl == null)
@@ -421,7 +552,7 @@
     else
     {
       params.requestType = REQUEST_TYPE_FROM_COOKIE;
-      params.multiDomainServerState = eclRequestControl.getCookie();
+      params.cookie = eclRequestControl.getCookie();
     }
     return params;
   }
@@ -523,7 +654,7 @@
   {
     try
     {
-      return numSubordinates(baseChangelogDN, true) + 1;
+      return numSubordinates(CHANGELOG_BASE_DN, true) + 1;
     }
     catch (DirectoryException e)
     {
@@ -543,33 +674,28 @@
   static class SearchParams
   {
     private ECLRequestType requestType;
-    private final String operationId;
     private final Set<String> excludedBaseDNs;
     private long lowestChangeNumber = -1;
     private long highestChangeNumber = -1;
     private CSN csn = new CSN(0, 0, 0);
-    private MultiDomainServerState multiDomainServerState;
+    private MultiDomainServerState cookie;
 
     /**
      * Creates search parameters.
      */
     SearchParams()
     {
-      operationId = "";
-      excludedBaseDNs = Collections.emptySet();
+      this.excludedBaseDNs = Collections.emptySet();
     }
 
     /**
      * Creates search parameters with provided id and excluded domain DNs.
      *
-     * @param operationId
-     *          The id of the operation.
      * @param excludedBaseDNs
      *          Set of DNs to exclude from search.
      */
-    SearchParams(final String operationId, final Set<String> excludedBaseDNs)
+    SearchParams(final Set<String> excludedBaseDNs)
     {
-      this.operationId = operationId;
       this.excludedBaseDNs = excludedBaseDNs;
     }
 
@@ -802,45 +928,40 @@
   private void searchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
       throws DirectoryException, ChangelogException
   {
-    final ReplicationDomainDB replicationDomainDB = replicationServer.getChangelogDB().getReplicationDomainDB();
     validateProvidedCookie(searchParams);
+    final boolean isPersistentSearch = isPersistentSearch(searchOperation);
 
-    boolean hasReturnedBaseEntry = false;
+    final EntrySender entrySender = new EntrySender(searchOperation, searchParams.cookie);
+    if (isPersistentSearch)
+    {
+      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender);
+    }
+
     ECLMultiDomainDBCursor replicaUpdatesCursor = null;
     try
     {
+      final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
       final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
-          searchParams.multiDomainServerState, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
+          searchParams.cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
       replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
 
-      MultiDomainServerState cookie = searchParams.multiDomainServerState;
       boolean continueSearch = true;
       while (continueSearch && replicaUpdatesCursor.next())
       {
-        // Handle creation of base changelog entry on first update message found
-        if (!hasReturnedBaseEntry)
-        {
-          if (!returnBaseChangelogEntry(searchOperation, true))
-          {
-            return;
-          }
-          hasReturnedBaseEntry = true;
-        }
         // Handle the update message
         final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
         final DN domainBaseDN = replicaUpdatesCursor.getData();
-        cookie.update(domainBaseDN, updateMsg.getCSN());
-        final Entry entry = createEntryFromMsg(domainBaseDN, 0L, cookie.toString(), updateMsg);
-        if (matchBaseAndScopeAndFilter(entry, searchOperation))
-        {
-          Control control = new EntryChangelogNotificationControl(true, cookie.toString());
-          continueSearch = searchOperation.returnEntry(entry, Arrays.asList(control));
-        }
+        searchParams.cookie.update(domainBaseDN, updateMsg.getCSN());
+        final String cookieString = searchParams.cookie.toString();
+
+        final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg);
+        continueSearch = entrySender.sendEntryIfMatches(entry, cookieString);
       }
-      // Handle creation of base changelog entry when no update message is found
-      if (!hasReturnedBaseEntry)
+
+      if (!isPersistentSearch)
       {
-        returnBaseChangelogEntry(searchOperation, false);
+        // send the base changelog entry if no update message is found
+        entrySender.sendBaseChangelogEntry(false);
       }
     }
     finally
@@ -849,6 +970,52 @@
     }
   }
 
+  private boolean isPersistentSearch(SearchOperation op)
+  {
+    for (PersistentSearch pSearch : getPersistentSearches())
+    {
+      if (op == pSearch.getSearchOperation())
+      {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void registerPersistentSearch(PersistentSearch pSearch)
+  {
+    super.registerPersistentSearch(pSearch);
+
+    final SearchOperation searchOp = pSearch.getSearchOperation();
+    if (pSearch.isChangesOnly())
+    {
+      // this persistent search will not go through #search0() down below
+      // so we must initialize the cookie here
+      searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL,
+          new EntrySender(searchOp, getNewestCookie(searchOp)));
+    }
+  }
+
+  private MultiDomainServerState getNewestCookie(SearchOperation searchOp)
+  {
+    if (!isCookieBased(searchOp))
+    {
+      return null;
+    }
+
+    final MultiDomainServerState cookie = new MultiDomainServerState();
+    for (final Iterator<ReplicationServerDomain> it =
+        replicationServer.getDomainIterator(); it.hasNext();)
+    {
+      final DN baseDN = it.next().getBaseDN();
+      final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN);
+      cookie.update(baseDN, state);
+    }
+    return cookie;
+  }
+
   /**
    * Validates the cookie contained in search parameters by checking its content
    * with the actual replication server state.
@@ -858,7 +1025,7 @@
    */
   private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
   {
-    final MultiDomainServerState state = searchParams.multiDomainServerState;
+    final MultiDomainServerState state = searchParams.cookie;
     if (state != null && !state.isEmpty())
     {
       replicationServer.validateServerState(state, searchParams.getExcludedBaseDNs());
@@ -871,102 +1038,67 @@
   private void searchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
       throws ChangelogException, DirectoryException
   {
-    boolean hasReturnedBaseEntry = false;
-    final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+    final EntrySender entrySender = new EntrySender(searchOperation, null);
+    final boolean isPersistentSearch = isPersistentSearch(searchOperation);
+    if (isPersistentSearch)
+    {
+      searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender);
+    }
+
     DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
     MultiDomainDBCursor replicaUpdatesCursor = null;
-    try {
-      cnIndexDBCursor = getCNIndexDBCursor(changelogDB, params.lowestChangeNumber);
+    try
+    {
+      cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber);
       boolean continueSearch = true;
       while (continueSearch && cnIndexDBCursor.next())
       {
-        // Handle creation of base changelog entry on cnIndex record found
-        if (!hasReturnedBaseEntry)
-        {
-          if (!returnBaseChangelogEntry(searchOperation, true))
-          {
-            return;
-          }
-          hasReturnedBaseEntry = true;
-        }
         // Handle the current cnIndex record
         final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
         if (replicaUpdatesCursor == null)
         {
-          replicaUpdatesCursor = initializeReplicaUpdatesCursor(changelogDB, cnIndexRecord);
+          replicaUpdatesCursor = initializeReplicaUpdatesCursor(cnIndexRecord);
         }
         continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
         if (continueSearch)
         {
-           UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
-           if (updateMsg != null)
-           {
-             continueSearch = returnEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
-             replicaUpdatesCursor.next();
-           }
+          UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
+          if (updateMsg != null)
+          {
+            continueSearch = sendEntryForUpdateMessage(entrySender, cnIndexRecord, updateMsg);
+            replicaUpdatesCursor.next();
+          }
         }
       }
-      // Handle creation of base changelog entry when no update message is found
-      if (!hasReturnedBaseEntry)
+
+      if (!isPersistentSearch)
       {
-        returnBaseChangelogEntry(searchOperation, false);
+        // send the base changelog entry if no update message is found
+        entrySender.sendBaseChangelogEntry(false);
       }
     }
-    finally {
+    finally
+    {
       StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor);
     }
   }
 
   /**
-   * Create and returns the base changelog entry to provided search operation.
-   *
    * @return {@code true} if search should continue, {@code false} otherwise
    */
-  private boolean returnBaseChangelogEntry(final SearchOperation searchOperation, boolean hasSubordinates)
-      throws DirectoryException
+  private boolean sendEntryForUpdateMessage(EntrySender entrySender,
+      ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException
   {
-    final DN baseDN = searchOperation.getBaseDN();
-    final SearchFilter filter = searchOperation.getFilter();
-    final SearchScope scope = searchOperation.getScope();
-
-    if (baseChangelogDN.matchesBaseAndScope(baseDN, scope))
-    {
-      final Entry entry = buildBaseChangelogEntry(hasSubordinates);
-      if (filter.matchesEntry(entry) && !searchOperation.returnEntry(entry, null))
-      {
-        // Abandon, size limit reached.
-        return false;
-      }
-    }
-    if (baseDN.equals(baseChangelogDN) && scope.equals(SearchScope.BASE_OBJECT))
-    {
-      // Only the change log root entry was requested
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * @return {@code true} if search should continue, {@code false} otherwise
-   */
-  private boolean returnEntryForUpdateMessage(
-      final SearchOperation searchOperation,
-      final ChangeNumberIndexRecord cnIndexRecord,
-      final UpdateMsg updateMsg)
-          throws DirectoryException
-  {
+    final DN baseDN = cnIndexRecord.getBaseDN();
     final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
-    final DN changeDN = cnIndexRecord.getBaseDN();
-    cookie.update(changeDN, cnIndexRecord.getCSN());
-    final Entry entry = createEntryFromMsg(changeDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
-    if (matchBaseAndScopeAndFilter(entry, searchOperation))
-    {
-      return searchOperation.returnEntry(entry, null);
-    }
-    return true;
+    cookie.update(baseDN, cnIndexRecord.getCSN());
+    final String cookieString = cookie.toString();
+
+    final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg);
+    return entrySender.sendEntryIfMatches(entry, null);
   }
 
-  private MultiDomainDBCursor initializeReplicaUpdatesCursor(final ChangelogDB changelogDB,
+  private MultiDomainDBCursor initializeReplicaUpdatesCursor(
       final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
   {
     final MultiDomainServerState state = new MultiDomainServerState();
@@ -975,7 +1107,7 @@
     // No need for ECLMultiDomainDBCursor in this case
     // as updateMsg will be matched with cnIndexRecord
     final MultiDomainDBCursor replicaUpdatesCursor =
-        changelogDB.getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
+        getChangelogDB().getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
     replicaUpdatesCursor.next();
     return replicaUpdatesCursor;
   }
@@ -1023,10 +1155,10 @@
   }
 
   /** Returns a cursor on CNIndexDB for the provided first change number. */
-  private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(final ChangelogDB changelogDB,
+  private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(
       final long firstChangeNumber) throws ChangelogException
   {
-    final ChangeNumberIndexDB cnIndexDB = changelogDB.getChangeNumberIndexDB();
+    final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB();
     long changeNumberToUse = firstChangeNumber;
     if (changeNumberToUse <= 1)
     {
@@ -1036,31 +1168,6 @@
     return cnIndexDB.getCursorFrom(changeNumberToUse);
   }
 
-  /** Indicates if the provided entry matches the filter, base and scope. */
-  private boolean matchBaseAndScopeAndFilter(Entry entry, SearchOperation searchOp) throws DirectoryException
-  {
-    return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
-        && searchOp.getFilter().matchesEntry(entry);
-  }
-
-  /**
-   * Retrieves the base changelog entry.
-   */
-  private Entry buildBaseChangelogEntry(boolean hasSubordinates)
-  {
-    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
-    final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
-
-    addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
-    addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
-        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
-    addAttributeByUppercaseName("hassubordinates", "hasSubordinates", Boolean.toString(hasSubordinates),
-        userAttrs, operationalAttrs);
-    addAttributeByUppercaseName("entrydn", "entryDN", baseChangelogDN.toString(),
-        userAttrs, operationalAttrs);
-    return new Entry(baseChangelogDN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
-  }
-
   /**
    * Creates a changelog entry.
    */
@@ -1225,16 +1332,16 @@
   {
     final CSN csn = msg.getCSN();
     String dnString;
-    if (changeNumber == 0)
-    {
-      // Cookie mode
-      dnString = "replicationCSN=" + csn + "," + baseDN.toString() + "," + DN_EXTERNAL_CHANGELOG_ROOT;
-    }
-    else
+    if (changeNumber > 0)
     {
       // Draft compat mode
       dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
     }
+    else
+    {
+      // Cookie mode
+      dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT;
+    }
 
     final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
     final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
@@ -1247,7 +1354,7 @@
     addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
 
     // REQUIRED attributes
-    if (changeNumber != 0)
+    if (changeNumber > 0)
     {
       addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
     }
@@ -1277,7 +1384,8 @@
     {
       addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
     }
-    addAttributeByType("changelogcookie", "changeLogCookie", cookie, userAttrs, opAttrs);
+    final String cookie2 = cookie != null ? cookie : "";
+    addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs);
 
     final List<RawAttribute> includedAttributes = msg.getEclIncludes();
     if (includedAttributes != null && !includedAttributes.isEmpty())
@@ -1300,6 +1408,116 @@
     return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
   }
 
+  /**
+   * Used to send entries to searches on cn=changelog. This class ensures the
+   * base changelog entry is sent before sending any other entry. It is also
+   * used as a store when going from the "initial search" phase to the
+   * "persistent search" phase.
+   */
+  private static class EntrySender
+  {
+
+    private final SearchOperation searchOp;
+    /**
+     * Used by the cookie-based searches to communicate the cookie between the
+     * initial search phase and the persistent search phase. This is unused with
+     * draft change number searches.
+     */
+    private final MultiDomainServerState cookie;
+    private final AtomicBoolean hasReturnedBaseEntry = new AtomicBoolean();
+
+    public EntrySender(SearchOperation searchOp, MultiDomainServerState cookie)
+    {
+      this.searchOp = searchOp;
+      this.cookie = cookie;
+    }
+
+    /**
+     * Sends the entry if it matches the base, scope and filter of the current search operation.
+     * It will also send the base changelog entry if it needs to be sent and was not sent before.
+     *
+     * @return {@code true} if search should continue, {@code false} otherwise
+     */
+    private boolean sendEntryIfMatches(Entry entry, String cookie) throws DirectoryException
+    {
+      // About to send one entry: ensure the base changelog entry is sent first
+      if (!sendBaseChangelogEntry(true))
+      {
+        // only return the base entry: stop here
+        return false;
+      }
+      if (matchBaseAndScopeAndFilter(entry))
+      {
+        return searchOp.returnEntry(entry, getControls(cookie));
+      }
+      // maybe the next entry will match?
+      return true;
+    }
+
+    /** Indicates if the provided entry matches the filter, base and scope. */
+    private boolean matchBaseAndScopeAndFilter(Entry entry) throws DirectoryException
+    {
+      return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
+          && searchOp.getFilter().matchesEntry(entry);
+    }
+
+    private List<Control> getControls(String cookie)
+    {
+      if (cookie != null)
+      {
+        Control c = new EntryChangelogNotificationControl(true, cookie);
+        return Arrays.asList(c);
+      }
+      return Collections.emptyList();
+    }
+
+    /**
+     * Create and returns the base changelog entry to the underlying search operation.
+     *
+     * @return {@code true} if search should continue, {@code false} otherwise
+     */
+    private boolean sendBaseChangelogEntry(boolean hasSubordinates) throws DirectoryException
+    {
+      if (hasReturnedBaseEntry.compareAndSet(false, true))
+      {
+        final DN baseDN = searchOp.getBaseDN();
+        final SearchFilter filter = searchOp.getFilter();
+        final SearchScope scope = searchOp.getScope();
+
+        if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope))
+        {
+          final Entry entry = buildBaseChangelogEntry(hasSubordinates);
+          if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null))
+          {
+            // Abandon, size limit reached.
+            return false;
+          }
+        }
+        return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN)
+            || !scope.equals(SearchScope.BASE_OBJECT);
+      }
+      return true;
+    }
+
+    private Entry buildBaseChangelogEntry(boolean hasSubordinates)
+    {
+      final Map<AttributeType, List<Attribute>> userAttrs =
+          new LinkedHashMap<AttributeType, List<Attribute>>();
+      final Map<AttributeType, List<Attribute>> operationalAttrs =
+          new LinkedHashMap<AttributeType, List<Attribute>>();
+
+      addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME,
+          ChangelogBackend.BACKEND_ID, userAttrs, operationalAttrs);
+      addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
+          ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
+      addAttributeByUppercaseName("hassubordinates", "hasSubordinates",
+          Boolean.toString(hasSubordinates), userAttrs, operationalAttrs);
+      addAttributeByUppercaseName("entrydn", "entryDN",
+          ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs);
+      return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
+    }
+  }
+
   private static void addAttribute(final Entry e, final String attrType, final String attrValue)
   {
     e.addAttribute(Attributes.create(attrType, attrValue), null);
@@ -1313,7 +1531,7 @@
     addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
   }
 
-  private void addAttributeByUppercaseName(String attrNameLowercase,
+  private static void addAttributeByUppercaseName(String attrNameLowercase,
       String attrNameUppercase,  String attrValue,
       Map<AttributeType, List<Attribute>> userAttrs,
       Map<AttributeType, List<Attribute>> operationalAttrs)
@@ -1331,8 +1549,9 @@
     {
       attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
     }
-    final Attribute a = addByType ?
-        Attributes.create(attrType, attrValue) : Attributes.create(attrNameUppercase, attrValue);
+    final Attribute a = addByType
+        ? Attributes.create(attrType, attrValue)
+        : Attributes.create(attrNameUppercase, attrValue);
     final List<Attribute> attrList = Collections.singletonList(a);
     if (attrType.isOperational())
     {

--
Gitblit v1.10.0