From c35cd58f1b2cf662cbdbb531ee80f676d87ba0de Mon Sep 17 00:00:00 2001
From: ian.packer <ian.packer@forgerock.com>
Date: Thu, 25 Feb 2016 14:21:15 +0000
Subject: [PATCH] OPENDJ-2446: Improve the scalability of the dsreplication purge-historical task
---
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 137 +++++++++++++++++++++++++++++----------------
1 files changed, 87 insertions(+), 50 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index db55d13..4737efc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -80,6 +80,8 @@
import org.opends.server.api.ServerShutdownListener;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.task.Task;
+import org.opends.server.config.ConfigConstants;
+import org.opends.server.controls.PagedResultsControl;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
@@ -4897,19 +4899,13 @@
}
/**
- * Check if the operation that just happened has cleared a conflict : Clearing
- * a conflict happens if the operation has freed a DN for which another entry
- * was in conflict.
- * <p>
- * Steps:
- * <ul>
- * <li>get the DN freed by a DELETE or MODRDN op</li>
- * <li>search for entries put in the conflict space (dn=entryUUID'+'....)
- * because the expected DN was not available (ds-sync-conflict=expected DN)
- * </li>
- * <li>retain the entry with the oldest conflict</li>
- * <li>rename this entry with the freedDN as it was expected originally</li>
- * </ul>
+ * Check and purge the historical attribute on all eligible entries under this domain.
+ *
+ * The purging logic is the same applied to individual entries during modify operations. This
+ * task may be useful in scenarios where a large number of changes are made as a one-off occurrence.
+ * Running a purge-historical after the 'ds-cfg-conflicts-historical-purge-delay' period has elapsed
+ * would clear out obsolete historical data from all the modified entries reducing the overall
+ * database size.
*
* @param task
* the task raising this purge.
@@ -4921,53 +4917,94 @@
public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
long endDate) throws DirectoryException
{
- logger.trace("[PURGE] purgeConflictsHistorical "
+ logger.trace("[PURGE] purgeConflictsHistorical "
+ "on domain: " + getBaseDN()
+ "endDate:" + new Date(endDate)
+ "lastCSNPurgedFromHist: "
+ lastCSNPurgedFromHist.toStringUI());
+
+ // It would be nice to have an upper bound on this filter to eliminate results that don't have a purgeable
+ // csn in them. However, historicalCsnOrderingMatch keys start with serverid rather than timestamp so this
+ // isn't possible.
String filter = "(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + lastCSNPurgedFromHist + ")";
- SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
- .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS);
- InternalSearchOperation searchOp = conn.processSearch(request);
- int count = 0;
- if (task != null)
- {
- task.setProgressStats(lastCSNPurgedFromHist, count);
- }
+ int count = 0;
+ boolean finished = false;
+ ByteString pagingCookie = null;
- for (SearchResultEntry entry : searchOp.getSearchEntries())
- {
- long maxTimeToRun = endDate - TimeThread.getTime();
- if (maxTimeToRun < 0)
- {
- throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED,
- LocalizableMessage.raw(" end date reached"));
- }
+ while(!finished)
+ {
+ if (task != null)
+ {
+ task.setProgressStats(lastCSNPurgedFromHist, count);
+ }
- EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
- lastCSNPurgedFromHist = entryHist.getOldestCSN();
- entryHist.setPurgeDelay(getHistoricalPurgeDelay());
- Attribute attr = entryHist.encodeAndPurge();
- count += entryHist.getLastPurgedValuesCount();
- List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
+ finished = true;
- ModifyOperation newOp = new ModifyOperationBasis(
- conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
- entry.getName(), mods);
- runAsSynchronizedOperation(newOp);
- if (newOp.getResultCode() != ResultCode.SUCCESS)
- {
- // Log information for the repair tool.
- logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode());
- }
- else if (task != null)
- {
- task.setProgressStats(lastCSNPurgedFromHist, count);
- }
- }
+ SearchRequest request = Requests.newSearchRequest(getBaseDN(), SearchScope.WHOLE_SUBTREE, filter)
+ .addAttribute(USER_AND_REPL_OPERATIONAL_ATTRS)
+ .addControl(new PagedResultsControl(false, ConfigConstants.DEFAULT_SIZE_LIMIT, pagingCookie))
+ .setSizeLimit(ConfigConstants.DEFAULT_SIZE_LIMIT + 1);
+
+ InternalSearchOperation searchOp = conn.processSearch(request);
+
+ for (Control c : searchOp.getResponseControls())
+ {
+ if (c.getOID().equals(OID_PAGED_RESULTS_CONTROL))
+ {
+ ByteString newPagingCookie = ((PagedResultsControl)c).getCookie();
+
+ if( newPagingCookie != null &&
+ newPagingCookie.length() > 0 &&
+ !newPagingCookie.equals(pagingCookie))
+ {
+ pagingCookie = newPagingCookie;
+ finished = false;
+ }
+ }
+ }
+
+ for (SearchResultEntry entry : searchOp.getSearchEntries())
+ {
+ long maxTimeToRun = endDate - TimeThread.getTime();
+ if (maxTimeToRun < 0) {
+ throw new DirectoryException(ResultCode.ADMIN_LIMIT_EXCEEDED,
+ LocalizableMessage.raw(" end date reached"));
+ }
+
+ EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
+
+ CSN latestOldCSN = entryHist.getOldestCSN();
+ entryHist.setPurgeDelay(getHistoricalPurgeDelay());
+ Attribute attr = entryHist.encodeAndPurge();
+
+ if(entryHist.getLastPurgedValuesCount() > 0)
+ {
+ lastCSNPurgedFromHist = latestOldCSN;
+ List<Modification> mods = newArrayList(new Modification(ModificationType.REPLACE, attr));
+ count += entryHist.getLastPurgedValuesCount();
+ ModifyOperation newOp = new ModifyOperationBasis(
+ conn, nextOperationID(), nextMessageID(), new ArrayList<Control>(0),
+ entry.getName(), mods);
+ runAsSynchronizedOperation(newOp);
+
+ if (newOp.getResultCode() != ResultCode.SUCCESS)
+ {
+ // Log information for the repair tool.
+ logger.error(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE, newOp, newOp.getResultCode());
+ }
+ else if (task != null)
+ {
+ task.setProgressStats(lastCSNPurgedFromHist, count);
+ }
+ }
+ }
+ }
+ // If a full sweep was completed, the lastCSNPurgedFromHist must be reset so that the next
+ // run-through starts from the beginning. Otherwise, subsequent runs of the task would only
+ // pick up purgeable changes for the last server id.
+ lastCSNPurgedFromHist = new CSN(0,0,0);
}
}
--
Gitblit v1.10.0