mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.32.2014 4af6c0362e528ebab371325232a557a5338a31d8
OPENDJ-1548 (CR-4579) In changelog backend, persistent searches should perform the same validations as normal searches

Added validation for changes only persistent searches inside ChangelogBackend.registerPersistentSearches().
For cookie based searches, cookie is now stored on the CookieEntrySender object rather than as a search operation attachment. This makes code easier to read and understand.


Backend.java
In registerPersistentSearch(), now throws DirectoryException.

LocalBackendSearchOperation.java:
Consequence of the change to Backend.registerPersistentSearch().

ChangelogBackend.java:
Added validatePersistentSearch() + called it from registerPersistentSearch().
Added cookie field and setCookie() to CookieEntrySender + removed COOKIE_ATTACHMENT constant + changed code that was using it + no longer pass cookie down method calls
Renamed initializeAttachements() to initializeEntrySender() + fixed a bug with the starting phase.
3 files modified
122 ■■■■■ changed files
opends/src/server/org/opends/server/api/Backend.java 4 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/ChangelogBackend.java 71 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java 47 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/api/Backend.java
@@ -891,8 +891,10 @@
   *
   * @param persistentSearch
   *          The persistent search operation to register with this backend
   * @throws DirectoryException
   *           If a problem occurs while registering the persistent search
   */
  public void registerPersistentSearch(PersistentSearch persistentSearch)
  public void registerPersistentSearch(PersistentSearch persistentSearch) throws DirectoryException
  {
    persistentSearches.add(persistentSearch);
opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -185,7 +185,6 @@
  private static final String CHANGE_NUMBER_ATTR = "changeNumber";
  private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
  private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie";
  private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
  /** The set of objectclasses that will be used in root entry. */
@@ -985,18 +984,16 @@
  {
    validateProvidedCookie(searchParams);
    final MultiDomainServerState cookie = searchParams.cookie;
    final CookieEntrySender entrySender;
    if (isPersistentSearch(searchOperation))
    {
      // communicate the cookie between the "initial search" phase and the "persistent search" phase
      searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie);
      entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
    }
    else
    {
      entrySender = new CookieEntrySender(searchOperation, SearchPhase.INITIAL);
    }
    entrySender.setCookie(searchParams.cookie);
    if (!sendBaseChangelogEntry(searchOperation))
    { // only return the base entry: stop here
@@ -1008,14 +1005,14 @@
    {
      final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
      final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
          cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
          searchParams.cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
      replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
      final boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
      if (continueSearch)
      {
        entrySender.transitioningToPersistentSearchPhase();
        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
      }
    }
    finally
@@ -1026,15 +1023,14 @@
  }
  private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
      ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException,
      DirectoryException
      final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException
  {
    boolean continueSearch = true;
    while (continueSearch && replicaUpdatesCursor.next())
    {
      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
      final DN domainBaseDN = replicaUpdatesCursor.getData();
      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie);
      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN);
    }
    return continueSearch;
  }
@@ -1053,9 +1049,10 @@
  /** {@inheritDoc} */
  @Override
  public void registerPersistentSearch(PersistentSearch pSearch)
  public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException
  {
    initializeAttachements(pSearch);
    validatePersistentSearch(pSearch);
    initializeEntrySender(pSearch);
    if (isCookieBased(pSearch.getSearchOperation()))
    {
@@ -1068,22 +1065,41 @@
    super.registerPersistentSearch(pSearch);
  }
  private void initializeAttachements(PersistentSearch pSearch)
  private void validatePersistentSearch(final PersistentSearch pSearch) throws DirectoryException
  {
    // Validation must be done during registration for changes only persistent searches.
    // Otherwise, when there is an initial search phase,
    // validation is performed by the search() method.
    if (pSearch.isChangesOnly())
    {
      final SearchOperation searchOperation = pSearch.getSearchOperation();
      checkChangelogReadPrivilege(searchOperation);
      final SearchParams params = buildSearchParameters(searchOperation);
      // next line also validates some search parameters
      optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
      validateProvidedCookie(params);
    }
  }
  private void initializeEntrySender(PersistentSearch pSearch)
  {
    final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL;
    final SearchOperation searchOp = pSearch.getSearchOperation();
    if (isCookieBased(searchOp))
    {
      final CookieEntrySender entrySender = new CookieEntrySender(searchOp, startPhase);
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, entrySender);
      if (pSearch.isChangesOnly())
      {
        // this changesOnly persistent search will not go through #initialSearch()
        // so we must initialize the cookie here
        searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp));
        entrySender.setCookie(getNewestCookie(searchOp));
      }
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp, SearchPhase.PERSISTENT));
    }
    else
    {
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, SearchPhase.PERSISTENT));
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, startPhase));
    }
  }
@@ -1762,7 +1778,7 @@
      sendEntryData.finalizeInitialSearch();
    }
    public void transitioningToPersistentSearchPhase()
    private void transitioningToPersistentSearchPhase()
    {
      sendEntryData.transitioningToPersistentSearchPhase();
    }
@@ -1792,6 +1808,7 @@
  private static class CookieEntrySender {
    private final SearchOperation searchOp;
    private final SearchPhase startPhase;
    private MultiDomainServerState cookie;
    private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
        new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
@@ -1801,7 +1818,12 @@
      this.startPhase = startPhase;
    }
    public void finalizeInitialSearch()
    private void setCookie(MultiDomainServerState cookie)
    {
      this.cookie = cookie;
    }
    private void finalizeInitialSearch()
    {
      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
      {
@@ -1809,7 +1831,7 @@
      }
    }
    public void transitioningToPersistentSearchPhase()
    private void transitioningToPersistentSearchPhase()
    {
      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
      {
@@ -1830,13 +1852,12 @@
      return data;
    }
    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN,
        final MultiDomainServerState cookie) throws DirectoryException
    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException
    {
      final CSN csn = updateMsg.getCSN();
      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
      sendEntryData.initialSearchSendsEntry(csn);
      final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
      final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
      final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
      return sendEntryIfMatches(searchOp, entry, cookieString);
    }
@@ -1849,9 +1870,7 @@
      if (sendEntryData.persistentSearchCanSendEntry(csn))
      {
        // multi threaded case: wait for the "initial search" phase to set the cookie
        final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT);
        final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
        final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
        final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
        // FIXME JNR use this instead of previous line:
        // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
@@ -1859,7 +1878,7 @@
      }
    }
    private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn)
    private String updateCookie(DN baseDN, final CSN csn)
    {
      synchronized (cookie)
      { // forbid concurrent updates to the cookie
opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
@@ -225,34 +225,33 @@
    // will be overwritten.
    setResultCode(ResultCode.SUCCESS);
    // If there's a persistent search, then register it with the server.
    boolean processSearchNow = true;
    if (persistentSearch != null)
    {
      // If we're only interested in changes, then we do not actually want
      // to process the search now.
      processSearchNow = !persistentSearch.isChangesOnly();
      // The Core server maintains the count of concurrent persistent searches
      // so that all the backends (Remote and Local) are aware of it. Verify
      // with the core if we have already reached the threshold.
      if (!DirectoryServer.allowNewPersistentSearch())
      {
        setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
        appendErrorMessage(ERR_MAX_PSEARCH_LIMIT_EXCEEDED.get());
        return;
      }
      backend.registerPersistentSearch(persistentSearch);
      persistentSearch.enable();
    }
    // Process the search in the backend and all its subordinates.
    try
    {
      // If there's a persistent search, then register it with the server.
      boolean processSearchNow = true;
      if (persistentSearch != null)
      {
        // If we're only interested in changes, then we do not actually want
        // to process the search now.
        processSearchNow = !persistentSearch.isChangesOnly();
        // The Core server maintains the count of concurrent persistent searches
        // so that all the backends (Remote and Local) are aware of it. Verify
        // with the core if we have already reached the threshold.
        if (!DirectoryServer.allowNewPersistentSearch())
        {
          setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
          appendErrorMessage(ERR_MAX_PSEARCH_LIMIT_EXCEEDED.get());
          return;
        }
        backend.registerPersistentSearch(persistentSearch);
        persistentSearch.enable();
      }
      if (processSearchNow)
      {
        // Process the search in the backend and all its subordinates.
        backend.search(this);
      }
    }