From 7067ad3cb534be96af07817eb5e9e270ae6efcd2 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Wed, 18 Aug 2010 13:27:32 +0000
Subject: [PATCH] First pass on solving issue #514, reducing Replication meta data, especially aged information. The default is to keep the replication meta data (historical information) for at least 1 day. Purging occurs on the fly when entries are modified, or via a task. Launching the task will be available through dsreplication in separate commit.
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 158 insertions(+), 11 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 7ad8093..f4bc7a4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -24,6 +24,7 @@
*
* Copyright 2006-2010 Sun Microsystems, Inc.
*/
+
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -62,9 +63,13 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
+import java.util.Date;
+import org.opends.server.util.TimeThread;
+import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
@@ -162,6 +167,7 @@
import org.opends.server.util.LDIFReader;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.*;
+import org.opends.server.tasks.PurgeConflictsHistoricalTask;
/**
* This class implements the bulk part of the.of the Directory Server side
@@ -239,7 +245,7 @@
*/
public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
- /**
+ /**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
@@ -408,6 +414,18 @@
// ChangeNumbers.
private boolean logChangeNumber = false;
+ // This configuration integer indicates the time the domain keeps the
+ // historical information necessary to solve conflicts.
+ // When a change stored in the historical part of the user entry has a date
+ // (from its replication ChangeNumber) older than this delay, it is candidate
+ // to be purged.
+ private long histPurgeDelayInMilliSec = 0;
+
+ // The last change number purged in this domain. Allows to have a continuous
+ // purging process from one purge processing (task run) to the next one.
+ // Values 0 when the server starts.
+ private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
+
/**
* The thread that periodically saves the ServerState of this
* LDAPReplicationDomain in the database.
@@ -547,14 +565,17 @@
// Read the configuration parameters.
Set<String> replicationServers = configuration.getReplicationServer();
- serverId = configuration.getServerId();
- baseDn = configuration.getBaseDN();
+
+ this.serverId = configuration.getServerId();
+ this.baseDn = configuration.getBaseDN();
int window = configuration.getWindowSize();
heartbeatInterval = configuration.getHeartbeatInterval();
- isolationpolicy = configuration.getIsolationPolicy();
- configDn = configuration.dn();
- logChangeNumber = configuration.isLogChangenumber();
+ this.isolationpolicy = configuration.getIsolationPolicy();
+ this.configDn = configuration.dn();
+ this.logChangeNumber = configuration.isLogChangenumber();
this.updateToReplayQueue = updateToReplayQueue;
+ this.histPurgeDelayInMilliSec =
+ configuration.getConflictsHistoricalPurgeDelay()*60*1000;
// Get assured configuration
readAssuredConfig(configuration, false);
@@ -2423,6 +2444,12 @@
* Check if the operation that just happened has cleared a conflict :
* Clearing a conflict happens if the operation has free a DN that
* for which an other entry was in conflict.
+ * Steps:
+ * - get the DN freed by a DELETE or MODRDN op
+ * - search for entries put in the conflict space (dn=entryuid'+'....)
+ * because the expected DN was not available (ds-sync-conflict=expected DN)
+ * - retain the entry with the oldest conflict
+ * - rename this entry with the freedDN as it was expected originally
*/
private void checkForClearedConflict(PostOperationOperation op)
{
@@ -2433,14 +2460,14 @@
return;
}
- DN targetDN;
+ DN freedDN;
if (type == OperationType.DELETE)
{
- targetDN = ((PostOperationDeleteOperation) op).getEntryDN();
+ freedDN = ((PostOperationDeleteOperation) op).getEntryDN();
}
else if (type == OperationType.MODIFY_DN)
{
- targetDN = ((PostOperationModifyDNOperation) op).getEntryDN();
+ freedDN = ((PostOperationModifyDNOperation) op).getEntryDN();
}
else
{
@@ -2451,7 +2478,7 @@
try
{
filter = LDAPFilter.decode(
- DS_SYNC_CONFLICT + "=" + targetDN.toNormalizedString());
+ DS_SYNC_CONFLICT + "=" + freedDN.toNormalizedString());
} catch (LDAPException e)
{
// Not possible. We know the filter just above is correct.
@@ -2491,7 +2518,7 @@
{
DN entryDN = entrytoRename.getDN();
ModifyDNOperationBasis newOp = renameEntry(
- entryDN, targetDN.getRDN(), targetDN.getParent(), false);
+ entryDN, freedDN.getRDN(), freedDN.getParent(), false);
ResultCode res = newOp.getResultCode();
if (res != ResultCode.SUCCESS)
@@ -4410,6 +4437,8 @@
{
isolationpolicy = configuration.getIsolationPolicy();
logChangeNumber = configuration.isLogChangenumber();
+ histPurgeDelayInMilliSec =
+ configuration.getConflictsHistoricalPurgeDelay()*60*1000;
changeConfig(
configuration.getReplicationServer(),
@@ -5588,4 +5617,122 @@
{
return this.eclDomain.isEnabled();
}
+
+ /**
+ * Return the purge delay (in ms) for the historical information stored
+ * in entries to solve conflicts for this domain.
+ *
+ * @return the purge delay.
+ */
+ public long getHistoricalPurgeDelay()
+ {
+ return histPurgeDelayInMilliSec;
+ }
+
+ /**
+ * Check if the operation that just happened has cleared a conflict :
+ * Clearing a conflict happens if the operation has free a DN that
+ * for which an other entry was in conflict.
+ * Steps:
+ * - get the DN freed by a DELETE or MODRDN op
+ * - search for entries put in the conflict space (dn=entryuid'+'....)
+ * because the expected DN was not available (ds-sync-conflict=expected DN)
+ * - retain the entry with the oldest conflict
+ * - rename this entry with the freedDN as it was expected originally
+ *
+ * @param task the task raising this purge.
+ * @param endDate the date to stop this task whether the job is done or not.
+ * @throws DirectoryException when an exception happens.
+ *
+ */
+ public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
+ long endDate)
+ throws DirectoryException
+ {
+ LDAPFilter filter = null;
+
+ TRACER.debugInfo("[PURGE] purgeConflictsHistorical "
+ + "on domain: " + baseDn
+ + "endDate:" + new Date(endDate)
+ + "lastChangeNumberPurgedFromHist: "
+ + lastChangeNumberPurgedFromHist.toStringUI());
+
+ try
+ {
+ filter = LDAPFilter.decode(
+ "(" + EntryHistorical.HISTORICALATTRIBUTENAME + ">=dummy:"
+ + lastChangeNumberPurgedFromHist + ")");
+
+ } catch (LDAPException e)
+ {
+ // Not possible. We know the filter just above is correct.
+ }
+
+ LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+ attrs.add(EntryHistorical.HISTORICALATTRIBUTENAME);
+ attrs.add(EntryHistorical.ENTRYUIDNAME);
+ attrs.add("*");
+ InternalSearchOperation searchOp = conn.processSearch(
+ ByteString.valueOf(baseDn.toString()),
+ SearchScope.WHOLE_SUBTREE,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false, filter,
+ attrs, null);
+
+ int count = 0;
+ task.setProgressStats(lastChangeNumberPurgedFromHist, count);
+
+ LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
+ for (SearchResultEntry entry : entries)
+ {
+ long maxTimeToRun = endDate - TimeThread.getTime();
+ if (maxTimeToRun<0)
+ {
+ Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
+ " end date reached");
+ DirectoryException de = new DirectoryException(
+ ResultCode.ADMIN_LIMIT_EXCEEDED,
+ errMsg);
+ throw (de);
+ }
+
+ EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
+ lastChangeNumberPurgedFromHist = entryHist.getOldestCN();
+ entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec);
+ Attribute attr = entryHist.encodeAndPurge();
+ count += entryHist.getLastPurgedValuesCount();
+ List<Modification> mods = new LinkedList<Modification>();
+ Modification mod;
+ mod = new Modification(ModificationType.REPLACE, attr);
+ mods.add(mod);
+
+ ModifyOperationBasis newOp =
+ new ModifyOperationBasis(
+ conn, InternalClientConnection.nextOperationID(),
+ InternalClientConnection.nextMessageID(),
+ new ArrayList<Control>(0),
+ entry.getDN(),
+ mods);
+ newOp.setInternalOperation(true);
+ newOp.setSynchronizationOperation(true);
+ newOp.setDontSynchronize(true);
+
+ newOp.run();
+
+ if (newOp.getResultCode() != ResultCode.SUCCESS)
+ {
+ // Log information for the repair tool.
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get());
+ mb.append(String.valueOf(newOp));
+ mb.append(" ");
+ mb.append(String.valueOf(newOp.getResultCode()));
+ logError(mb.toMessage());
+ }
+ else
+ {
+ task.setProgressStats(lastChangeNumberPurgedFromHist, count);
+ }
+ }
+ }
}
--
Gitblit v1.10.0