From c35cd58f1b2cf662cbdbb531ee80f676d87ba0de Mon Sep 17 00:00:00 2001
From: ian.packer <ian.packer@forgerock.com>
Date: Thu, 25 Feb 2016 14:21:15 +0000
Subject: [PATCH] OPENDJ-2446: Improve the scalability of the dsreplication purge-historical task

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java |  137 +++++++++++++++++++++++++++++----------------
 1 files changed, 87 insertions(+), 50 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index db55d13..4737efc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -80,6 +80,8 @@
 import org.opends.server.api.ServerShutdownListener;
 import org.opends.server.api.SynchronizationProvider;
 import org.opends.server.backends.task.Task;
+import org.opends.server.config.ConfigConstants;
+import org.opends.server.controls.PagedResultsControl;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.DirectoryServer;
@@ -4897,19 +4899,13 @@
   }
 
   /**
-   * Check if the operation that just happened has cleared a conflict : Clearing
-   * a conflict happens if the operation has freed a DN for which another entry
-   * was in conflict.
-   * <p>
-   * Steps:
-   * <ul>
-   * <li>get the DN freed by a DELETE or MODRDN op</li>
-   * <li>search for entries put in the conflict space (dn=entryUUID'+'....)
-   * because the expected DN was not available (ds-sync-conflict=expected DN)
-   * </li>
-   * <li>retain the entry with the oldest conflict</li>
-   * <li>rename this entry with the freedDN as it was expected originally</li>
-   * </ul>
+   * Check and purge the historical attribute on all eligible entries under this domain.
+   *
+   * The purging logic is the same applied to individual entries during modify operations. This
+   * task may be useful in scenarios where a large number of changes are made as a one-off occurrence.
+   * Running a purge-historical after the 'ds-cfg-conflicts-historical-purge-delay' period has elapsed
+   * would clear out obsolete historical data from all the modified entries reducing the overall
+   * database size.
    *
    * @param task
    *          the task raising this purge.
@@ -4921,53 +4917,94 @@
   public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
       long endDate) throws DirectoryException
   {
-     logger.trace("[PURGE] purgeConflictsHistorical "
+    logger.trace("[PURGE] purgeConflictsHistorical "
          + "on domain: " + getBaseDN()
          + "endDate:" + new Date(endDate)
          + "lastCSNPurgedFromHist: "
          + lastCSNPurgedFromHist.toStringUI());
 
+
+    // It would be nice to have an upper bound on this filter to eliminate results that don't have a purgeable
+    // csn in them. However, historicalCsnOrderingMatch keys start with serverid rather than timestamp so this
+    // isn't possible.
     String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")";
-    SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
-        .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
-    InternalSearchOperation searchOp = conn.processSearch(request);
 
-     int count = 0;
-     if (task != null)
-     {
-       task.setProgressStats(lastCSNPurgedFromHist, count);
-     }
+    int count = 0;
+    boolean finished = false;
+    ByteString pagingCookie = null;
 
-     for (SearchResultEntry entry : searchOp.getSearchEntries())
-     {
-       long maxTimeToRun = endDate - TimeThread.getTime();
-       if (maxTimeToRun < 0)
-       {
-        throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED,
-            LocalizableMessage.raw(" end date reached"));
-       }
+    while(!finished)
+    {
+      if (task != null)
+      {
+        task.setProgressStats(lastCSNPurgedFromHist, count);
+      }
 
-       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
-       lastCSNPurgedFromHist = entryHist.getOldestCSN();
-       entryHist.setPurgeDelay(getHistoricalPurgeDelay());
-       Attribute attr = entryHist.encodeAndPurge();
-       count += entryHist.getLastPurgedValuesCount();
-       List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
+      finished = true;
 
-       ModifyOperation newOp = new ModifyOperationBasis(
-           conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
-           entry.getName(), mods);
-       runAsSynchronizedOperation(newOp);
 
-       if (newOp.getResultCode() != ResultCode.SUCCESS)
-       {
-         // Log information for the repair tool.
-         logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode());
-       }
-       else if (task != null)
-       {
-         task.setProgressStats(lastCSNPurgedFromHist, count);
-       }
-     }
+      SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
+          .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS)
+          .addControl(new PagedResultsControl(false, ConfigConstants.DEFAULT_SIZE_LIMIT, pagingCookie))
+          .setSizeLimit(ConfigConstants.DEFAULT_SIZE_LIMIT + 1);
+
+      InternalSearchOperation searchOp = conn.processSearch(request);
+
+      for (Control c : searchOp.getResponseControls())
+      {
+        if (c.getOID().equals(OID_PAGED_RESULTS_CONTROL))
+        {
+          ByteString newPagingCookie = ((PagedResultsControl)c).getCookie();
+
+          if( newPagingCookie != null &&
+              newPagingCookie.length() > 0 &&
+              !newPagingCookie.equals(pagingCookie))
+          {
+            pagingCookie = newPagingCookie;
+            finished = false;
+          }
+        }
+      }
+
+      for (SearchResultEntry entry : searchOp.getSearchEntries())
+      {
+        long maxTimeToRun = endDate - TimeThread.getTime();
+        if (maxTimeToRun < 0) {
+          throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED,
+              LocalizableMessage.raw(" end date reached"));
+        }
+
+        EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
+
+        CSN latestOldCSN = entryHist.getOldestCSN();
+        entryHist.setPurgeDelay(getHistoricalPurgeDelay());
+        Attribute attr = entryHist.encodeAndPurge();
+
+        if(entryHist.getLastPurgedValuesCount() > 0)
+        {
+          lastCSNPurgedFromHist = latestOldCSN;
+          List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
+          count += entryHist.getLastPurgedValuesCount();
+          ModifyOperation newOp = new ModifyOperationBasis(
+              conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
+              entry.getName(), mods);
+          runAsSynchronizedOperation(newOp);
+
+          if (newOp.getResultCode() != ResultCode.SUCCESS)
+          {
+            // Log information for the repair tool.
+            logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode());
+          }
+          else if (task != null)
+          {
+            task.setProgressStats(lastCSNPurgedFromHist, count);
+          }
+        }
+      }
+    }
+    // If a full sweep was completed, the lastCSNPurgedFromHist must be reset so that the next
+    // run-through starts from the beginning. Otherwise, subsequent runs of the task would only
+    // pick up purgeable changes for the last server id.
+    lastCSNPurgedFromHist = new CSN(0,0,0);
   }
 }

--
Gitblit v1.10.0