mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.32.2014 474b41764f7f5ebcb021dd1445af8735977e1cab
opendj-sdk/opends/src/server/org/opends/server/api/Backend.java
@@ -891,8 +891,10 @@
   *
   * @param persistentSearch
   *          The persistent search operation to register with this backend
   * @throws DirectoryException
   *           If a problem occurs while registering the persistent search
   */
  public void registerPersistentSearch(PersistentSearch persistentSearch)
  public void registerPersistentSearch(PersistentSearch persistentSearch) throws DirectoryException
  {
    persistentSearches.add(persistentSearch);
opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -185,7 +185,6 @@
  private static final String CHANGE_NUMBER_ATTR = "changeNumber";
  private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
  private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie";
  private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
  /** The set of objectclasses that will be used in root entry. */
@@ -985,18 +984,16 @@
  {
    validateProvidedCookie(searchParams);
    final MultiDomainServerState cookie = searchParams.cookie;
    final CookieEntrySender entrySender;
    if (isPersistentSearch(searchOperation))
    {
      // communicate the cookie between the "initial search" phase and the "persistent search" phase
      searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie);
      entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
    }
    else
    {
      entrySender = new CookieEntrySender(searchOperation, SearchPhase.INITIAL);
    }
    entrySender.setCookie(searchParams.cookie);
    if (!sendBaseChangelogEntry(searchOperation))
    { // only return the base entry: stop here
@@ -1008,14 +1005,14 @@
    {
      final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
      final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
          cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
          searchParams.cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
      replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
      final boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
      if (continueSearch)
      {
        entrySender.transitioningToPersistentSearchPhase();
        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
      }
    }
    finally
@@ -1026,15 +1023,14 @@
  }
  private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
      ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException,
      DirectoryException
      final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException
  {
    boolean continueSearch = true;
    while (continueSearch && replicaUpdatesCursor.next())
    {
      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
      final DN domainBaseDN = replicaUpdatesCursor.getData();
      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie);
      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN);
    }
    return continueSearch;
  }
@@ -1053,9 +1049,10 @@
  /** {@inheritDoc} */
  @Override
  public void registerPersistentSearch(PersistentSearch pSearch)
  public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException
  {
    initializeAttachements(pSearch);
    validatePersistentSearch(pSearch);
    initializeEntrySender(pSearch);
    if (isCookieBased(pSearch.getSearchOperation()))
    {
@@ -1068,22 +1065,41 @@
    super.registerPersistentSearch(pSearch);
  }
  private void initializeAttachements(PersistentSearch pSearch)
  private void validatePersistentSearch(final PersistentSearch pSearch) throws DirectoryException
  {
    // Validation must be done during registration for changes only persistent searches.
    // Otherwise, when there is an initial search phase,
    // validation is performed by the search() method.
    if (pSearch.isChangesOnly())
    {
      final SearchOperation searchOperation = pSearch.getSearchOperation();
      checkChangelogReadPrivilege(searchOperation);
      final SearchParams params = buildSearchParameters(searchOperation);
      // next line also validates some search parameters
      optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
      validateProvidedCookie(params);
    }
  }
  private void initializeEntrySender(PersistentSearch pSearch)
  {
    final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL;
    final SearchOperation searchOp = pSearch.getSearchOperation();
    if (isCookieBased(searchOp))
    {
      final CookieEntrySender entrySender = new CookieEntrySender(searchOp, startPhase);
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, entrySender);
      if (pSearch.isChangesOnly())
      {
        // this changesOnly persistent search will not go through #initialSearch()
        // so we must initialize the cookie here
        searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp));
        entrySender.setCookie(getNewestCookie(searchOp));
      }
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp, SearchPhase.PERSISTENT));
    }
    else
    {
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, SearchPhase.PERSISTENT));
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, startPhase));
    }
  }
@@ -1762,7 +1778,7 @@
      sendEntryData.finalizeInitialSearch();
    }
    public void transitioningToPersistentSearchPhase()
    private void transitioningToPersistentSearchPhase()
    {
      sendEntryData.transitioningToPersistentSearchPhase();
    }
@@ -1792,6 +1808,7 @@
  private static class CookieEntrySender {
    private final SearchOperation searchOp;
    private final SearchPhase startPhase;
    private MultiDomainServerState cookie;
    private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
        new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
@@ -1801,7 +1818,12 @@
      this.startPhase = startPhase;
    }
    public void finalizeInitialSearch()
    private void setCookie(MultiDomainServerState cookie)
    {
      this.cookie = cookie;
    }
    private void finalizeInitialSearch()
    {
      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
      {
@@ -1809,7 +1831,7 @@
      }
    }
    public void transitioningToPersistentSearchPhase()
    private void transitioningToPersistentSearchPhase()
    {
      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
      {
@@ -1830,13 +1852,12 @@
      return data;
    }
    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN,
        final MultiDomainServerState cookie) throws DirectoryException
    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException
    {
      final CSN csn = updateMsg.getCSN();
      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
      sendEntryData.initialSearchSendsEntry(csn);
      final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
      final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
      final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
      return sendEntryIfMatches(searchOp, entry, cookieString);
    }
@@ -1849,9 +1870,7 @@
      if (sendEntryData.persistentSearchCanSendEntry(csn))
      {
        // multi threaded case: wait for the "initial search" phase to set the cookie
        final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT);
        final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
        final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
        final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
        // FIXME JNR use this instead of previous line:
        // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
@@ -1859,7 +1878,7 @@
      }
    }
    private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn)
    private String updateCookie(DN baseDN, final CSN csn)
    {
      synchronized (cookie)
      { // forbid concurrent updates to the cookie
opendj-sdk/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
@@ -225,34 +225,33 @@
    // will be overwritten.
    setResultCode(ResultCode.SUCCESS);
    // If there's a persistent search, then register it with the server.
    boolean processSearchNow = true;
    if (persistentSearch != null)
    {
      // If we're only interested in changes, then we do not actually want
      // to process the search now.
      processSearchNow = !persistentSearch.isChangesOnly();
      // The Core server maintains the count of concurrent persistent searches
      // so that all the backends (Remote and Local) are aware of it. Verify
      // with the core if we have already reached the threshold.
      if (!DirectoryServer.allowNewPersistentSearch())
      {
        setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
        appendErrorMessage(ERR_MAX_PSEARCH_LIMIT_EXCEEDED.get());
        return;
      }
      backend.registerPersistentSearch(persistentSearch);
      persistentSearch.enable();
    }
    // Process the search in the backend and all its subordinates.
    try
    {
      // If there's a persistent search, then register it with the server.
      boolean processSearchNow = true;
      if (persistentSearch != null)
      {
        // If we're only interested in changes, then we do not actually want
        // to process the search now.
        processSearchNow = !persistentSearch.isChangesOnly();
        // The Core server maintains the count of concurrent persistent searches
        // so that all the backends (Remote and Local) are aware of it. Verify
        // with the core if we have already reached the threshold.
        if (!DirectoryServer.allowNewPersistentSearch())
        {
          setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
          appendErrorMessage(ERR_MAX_PSEARCH_LIMIT_EXCEEDED.get());
          return;
        }
        backend.registerPersistentSearch(persistentSearch);
        persistentSearch.enable();
      }
      if (processSearchNow)
      {
        // Process the search in the backend and all its subordinates.
        backend.search(this);
      }
    }