mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ian.packer
23.10.2016 c35cd58f1b2cf662cbdbb531ee80f676d87ba0de
OPENDJ-2446: Improve the scalability of the dsreplication purge-historical task

* Use paging control on the internal search to reduce memory consumption of this task.
* Only run modify operations against entries if their historical attr value is really being changed (significant performance improvement).
* Amended Javadoc to describe what purgeConflictsHistorical actually does.
* lastCSNPurgedFromHist is now much more accurate. Previously it could be set to a CSN that was checked but not actually purged. This means that repeated running of the task is more reliable.
* Reset lastCSNPurgedFromHist after a full run completes successfully.
1 files modified
79 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java 79 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -80,6 +80,8 @@
import org.opends.server.api.ServerShutdownListener;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigConstants;
import org.opends.server.controls.PagedResultsControl;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -4897,19 +4899,13 @@
  }
  /**
   * Check if the operation that just happened has cleared a conflict : Clearing
   * a conflict happens if the operation has freed a DN for which another entry
   * was in conflict.
   * <p>
   * Steps:
   * <ul>
   * <li>get the DN freed by a DELETE or MODRDN op</li>
   * <li>search for entries put in the conflict space (dn=entryUUID'+'....)
   * because the expected DN was not available (ds-sync-conflict=expected DN)
   * </li>
   * <li>retain the entry with the oldest conflict</li>
   * <li>rename this entry with the freedDN as it was expected originally</li>
   * </ul>
   * Check and purge the historical attribute on all eligible entries under this domain.
   *
   * The purging logic is the same applied to individual entries during modify operations. This
   * task may be useful in scenarios where a large number of changes are made as a one-off occurrence.
   * Running a purge-historical after the 'ds-cfg-conflicts-historical-purge-delay' period has elapsed
   * would clear out obsolete historical data from all the modified entries reducing the overall
   * database size.
   *
   * @param task
   *          the task raising this purge.
@@ -4927,33 +4923,68 @@
         + "lastCSNPurgedFromHist: "
         + lastCSNPurgedFromHist.toStringUI());
    // It would be nice to have an upper bound on this filter to eliminate results that don't have a purgeable
    // csn in them. However, historicalCsnOrderingMatch keys start with serverid rather than timestamp so this
    // isn't possible.
    String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")";
    SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
    InternalSearchOperation searchOp = conn.processSearch(request);
     int count = 0;
    boolean finished = false;
    ByteString pagingCookie = null;
    while(!finished)
    {
     if (task != null)
     {
       task.setProgressStats(lastCSNPurgedFromHist, count);
     }
      finished = true;
      SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
          .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS)
          .addControl(new PagedResultsControl(false, ConfigConstants.DEFAULT_SIZE_LIMIT, pagingCookie))
          .setSizeLimit(ConfigConstants.DEFAULT_SIZE_LIMIT + 1);
      InternalSearchOperation searchOp = conn.processSearch(request);
      for (Control c : searchOp.getResponseControls())
      {
        if (c.getOID().equals(OID_PAGED_RESULTS_CONTROL))
        {
          ByteString newPagingCookie = ((PagedResultsControl)c).getCookie();
          if( newPagingCookie != null &&
              newPagingCookie.length() > 0 &&
              !newPagingCookie.equals(pagingCookie))
          {
            pagingCookie = newPagingCookie;
            finished = false;
          }
        }
      }
     for (SearchResultEntry entry : searchOp.getSearchEntries())
     {
       long maxTimeToRun = endDate - TimeThread.getTime();
       if (maxTimeToRun < 0)
       {
        if (maxTimeToRun < 0) {
        throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED,
            LocalizableMessage.raw(" end date reached"));
       }
       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
       lastCSNPurgedFromHist = entryHist.getOldestCSN();
        CSN latestOldCSN = entryHist.getOldestCSN();
       entryHist.setPurgeDelay(getHistoricalPurgeDelay());
       Attribute attr = entryHist.encodeAndPurge();
       count += entryHist.getLastPurgedValuesCount();
       List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
        if(entryHist.getLastPurgedValuesCount() > 0)
        {
          lastCSNPurgedFromHist = latestOldCSN;
          List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
          count += entryHist.getLastPurgedValuesCount();
       ModifyOperation newOp = new ModifyOperationBasis(
           conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
           entry.getName(), mods);
@@ -4971,3 +5002,9 @@
     }
  }
}
    // If a full sweep was completed, the lastCSNPurgedFromHist must be reset so that the next
    // run-through starts from the beginning. Otherwise, subsequent runs of the task would only
    // pick up purgeable changes for the last server id.
    lastCSNPurgedFromHist = new CSN(0,0,0);
  }
}