mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.32.2014 474b41764f7f5ebcb021dd1445af8735977e1cab
opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -185,7 +185,6 @@
  private static final String CHANGE_NUMBER_ATTR = "changeNumber";
  private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
  private static final String COOKIE_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".cookie";
  private static final String ENTRY_SENDER_ATTACHMENT = OID_ECL_COOKIE_EXCHANGE_CONTROL + ".entrySender";
  /** The set of objectclasses that will be used in root entry. */
@@ -985,18 +984,16 @@
  {
    validateProvidedCookie(searchParams);
    final MultiDomainServerState cookie = searchParams.cookie;
    final CookieEntrySender entrySender;
    if (isPersistentSearch(searchOperation))
    {
      // communicate the cookie between the "initial search" phase and the "persistent search" phase
      searchOperation.setAttachment(COOKIE_ATTACHMENT, cookie);
      entrySender = searchOperation.getAttachment(ENTRY_SENDER_ATTACHMENT);
    }
    else
    {
      entrySender = new CookieEntrySender(searchOperation, SearchPhase.INITIAL);
    }
    entrySender.setCookie(searchParams.cookie);
    if (!sendBaseChangelogEntry(searchOperation))
    { // only return the base entry: stop here
@@ -1008,14 +1005,14 @@
    {
      final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB();
      final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
          cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
          searchParams.cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
      replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
      final boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
      if (continueSearch)
      {
        entrySender.transitioningToPersistentSearchPhase();
        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie);
        sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor);
      }
    }
    finally
@@ -1026,15 +1023,14 @@
  }
  private boolean sendCookieEntriesFromCursor(final CookieEntrySender entrySender,
      ECLMultiDomainDBCursor replicaUpdatesCursor, final MultiDomainServerState cookie) throws ChangelogException,
      DirectoryException
      final ECLMultiDomainDBCursor replicaUpdatesCursor) throws ChangelogException, DirectoryException
  {
    boolean continueSearch = true;
    while (continueSearch && replicaUpdatesCursor.next())
    {
      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
      final DN domainBaseDN = replicaUpdatesCursor.getData();
      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN, cookie);
      continueSearch = entrySender.initialSearchSendEntry(updateMsg, domainBaseDN);
    }
    return continueSearch;
  }
@@ -1053,9 +1049,10 @@
  /** {@inheritDoc} */
  @Override
  public void registerPersistentSearch(PersistentSearch pSearch)
  public void registerPersistentSearch(PersistentSearch pSearch) throws DirectoryException
  {
    initializeAttachements(pSearch);
    validatePersistentSearch(pSearch);
    initializeEntrySender(pSearch);
    if (isCookieBased(pSearch.getSearchOperation()))
    {
@@ -1068,22 +1065,41 @@
    super.registerPersistentSearch(pSearch);
  }
  private void initializeAttachements(PersistentSearch pSearch)
  private void validatePersistentSearch(final PersistentSearch pSearch) throws DirectoryException
  {
    // Validation must be done during registration for changes only persistent searches.
    // Otherwise, when there is an initial search phase,
    // validation is performed by the search() method.
    if (pSearch.isChangesOnly())
    {
      final SearchOperation searchOperation = pSearch.getSearchOperation();
      checkChangelogReadPrivilege(searchOperation);
      final SearchParams params = buildSearchParameters(searchOperation);
      // next line also validates some search parameters
      optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
      validateProvidedCookie(params);
    }
  }
  private void initializeEntrySender(PersistentSearch pSearch)
  {
    final SearchPhase startPhase = pSearch.isChangesOnly() ? SearchPhase.PERSISTENT : SearchPhase.INITIAL;
    final SearchOperation searchOp = pSearch.getSearchOperation();
    if (isCookieBased(searchOp))
    {
      final CookieEntrySender entrySender = new CookieEntrySender(searchOp, startPhase);
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, entrySender);
      if (pSearch.isChangesOnly())
      {
        // this changesOnly persistent search will not go through #initialSearch()
        // so we must initialize the cookie here
        searchOp.setAttachment(COOKIE_ATTACHMENT, getNewestCookie(searchOp));
        entrySender.setCookie(getNewestCookie(searchOp));
      }
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new CookieEntrySender(searchOp, SearchPhase.PERSISTENT));
    }
    else
    {
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, SearchPhase.PERSISTENT));
      searchOp.setAttachment(ENTRY_SENDER_ATTACHMENT, new ChangeNumberEntrySender(searchOp, startPhase));
    }
  }
@@ -1762,7 +1778,7 @@
      sendEntryData.finalizeInitialSearch();
    }
    public void transitioningToPersistentSearchPhase()
    private void transitioningToPersistentSearchPhase()
    {
      sendEntryData.transitioningToPersistentSearchPhase();
    }
@@ -1792,6 +1808,7 @@
  private static class CookieEntrySender {
    private final SearchOperation searchOp;
    private final SearchPhase startPhase;
    private MultiDomainServerState cookie;
    private final ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>> replicaIdToSendEntryData =
        new ConcurrentSkipListMap<Pair<DN, Integer>, SendEntryData<CSN>>(Pair.COMPARATOR);
@@ -1801,7 +1818,12 @@
      this.startPhase = startPhase;
    }
    public void finalizeInitialSearch()
    private void setCookie(MultiDomainServerState cookie)
    {
      this.cookie = cookie;
    }
    private void finalizeInitialSearch()
    {
      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
      {
@@ -1809,7 +1831,7 @@
      }
    }
    public void transitioningToPersistentSearchPhase()
    private void transitioningToPersistentSearchPhase()
    {
      for (SendEntryData<CSN> sendEntryData : replicaIdToSendEntryData.values())
      {
@@ -1830,13 +1852,12 @@
      return data;
    }
    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN,
        final MultiDomainServerState cookie) throws DirectoryException
    private boolean initialSearchSendEntry(final UpdateMsg updateMsg, final DN baseDN) throws DirectoryException
    {
      final CSN csn = updateMsg.getCSN();
      final SendEntryData<CSN> sendEntryData = getSendEntryData(baseDN, csn);
      sendEntryData.initialSearchSendsEntry(csn);
      final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
      final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
      final Entry entry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
      return sendEntryIfMatches(searchOp, entry, cookieString);
    }
@@ -1849,9 +1870,7 @@
      if (sendEntryData.persistentSearchCanSendEntry(csn))
      {
        // multi threaded case: wait for the "initial search" phase to set the cookie
        final MultiDomainServerState cookie = searchOp.getAttachment(COOKIE_ATTACHMENT);
        final String cookieString = updateCookie(cookie, baseDN, updateMsg.getCSN());
        final String cookieString = updateCookie(baseDN, updateMsg.getCSN());
        final Entry cookieEntry = createEntryFromMsg(baseDN, 0, cookieString, updateMsg);
        // FIXME JNR use this instead of previous line:
        // entry.replaceAttribute(Attributes.create("changelogcookie", cookieString));
@@ -1859,7 +1878,7 @@
      }
    }
    private String updateCookie(final MultiDomainServerState cookie, DN baseDN, final CSN csn)
    private String updateCookie(DN baseDN, final CSN csn)
    {
      synchronized (cookie)
      { // forbid concurrent updates to the cookie