opends/resource/config/config.ldif
@@ -65,6 +65,7 @@ ds-cfg-allowed-task: org.opends.server.tasks.RebuildTask ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask ds-cfg-allowed-task: org.opends.server.tasks.ShutdownTask ds-cfg-allowed-task: org.opends.server.tasks.PurgeConflictsHistoricalTask dn: cn=Access Control Handler,cn=config objectClass: top opends/resource/schema/02-config.ldif
@@ -2490,6 +2490,41 @@ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.614 NAME 'ds-cfg-conflicts-historical-purge-delay' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.615 NAME 'ds-task-purge-conflicts-historical-domain-dn' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.616 NAME 'ds-task-purge-conflicts-historical-maximum-duration' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.617 NAME 'ds-task-purge-conflicts-historical-last-purged-changenumber' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.618 NAME 'ds-task-purge-conflicts-historical-purge-completed-in-time' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.619 NAME 'ds-task-purge-conflicts-historical-purged-values-count' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.620 NAME 'ds-task-purge-conflicts-historical-first-purged-changenumber' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.1 NAME 'ds-cfg-access-control-handler' SUP top @@ -3057,6 +3092,7 @@ ds-cfg-fractional-exclude $ ds-cfg-fractional-include $ ds-cfg-solve-conflicts $ ds-cfg-conflicts-historical-purge-delay $ ds-cfg-changetime-heartbeat-interval $ ds-cfg-log-changenumber $ ds-cfg-initialization-window-size ) @@ -4196,4 +4232,15 @@ SUP ds-cfg-virtual-attribute STRUCTURAL X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.237 NAME 'ds-task-purge-conflicts-historical' SUP ds-task STRUCTURAL MUST ( ds-task-purge-conflicts-historical-domain-dn ) MAY ( ds-task-purge-conflicts-historical-maximum-duration $ ds-task-purge-conflicts-historical-first-purged-changenumber $ ds-task-purge-conflicts-historical-last-purged-changenumber $ ds-task-purge-conflicts-historical-purge-completed-in-time $ ds-task-purge-conflicts-historical-purged-values-count ) X-ORIGIN 'OpenDS Directory Server' ) opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
@@ -536,4 +536,26 @@ </ldap:attribute> </adm:profile> </adm:property> <adm:property name="conflicts-historical-purge-delay"> <adm:synopsis> This delay indicates the time (in minutes) the domain keeps the historical information necessary to solve conflicts.When a change stored in the historical part of the user entry has a date (from its replication ChangeNumber) older than this delay, it is candidate to be purged. The purge is applied on 2 events: modify of the entry, dedicated purge task. </adm:synopsis> <adm:default-behavior> <adm:defined> <adm:value>1440m</adm:value> </adm:defined> </adm:default-behavior> <adm:syntax> <adm:duration base-unit="m" allow-unlimited="false" /> </adm:syntax> <adm:profile name="ldap"> <ldap:attribute> <ldap:name>ds-cfg-conflicts-historical-purge-delay</ldap:name> </ldap:attribute> </adm:profile> </adm:property> </adm:managed-object> opends/src/admin/messages/ReplicationDomainCfgDefn.properties
@@ -13,6 +13,7 @@ property.base-dn.synopsis=Specifies the base DN of the replicated data. property.changetime-heartbeat-interval.synopsis=Specifies the heart-beat interval that the Directory Server will use when sending its local change time to the Replication Server. property.changetime-heartbeat-interval.description=The Directory Server sends a regular heart-beat to the Replication within the specified interval. The heart-beat indicates the change time of the Directory Server to the Replication Server. property.conflicts-historical-purge-delay.synopsis=This delay indicates the time (in minutes) the domain keeps the historical information necessary to solve conflicts.When a change stored in the historical part of the user entry has a date (from its replication ChangeNumber) older than this delay, it is candidate to be purged. The purge is applied on 2 events: modify of the entry, dedicated purge task. property.fractional-exclude.synopsis=Allows to exclude some attributes to replicate to this server. property.fractional-exclude.description=If fractional-exclude configuration attribute is used, attributes specified in this attribute will be ignored (not added/modified/deleted) when an operation performed from another directory server is being replayed in the local server. Note that the usage of this configuration attribute is mutually exclusive with the usage of the fractional-include attribute. property.fractional-exclude.syntax.string.pattern.synopsis=Defines attribute(s) of one particular class or of all possible classes, to exclude from the replication. opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -4370,5 +4370,48 @@ */ public static final String ATTR_IMPORT_CLEAR_BACKEND = NAME_PREFIX_TASK + "import-clear-backend"; /** * The name of the attribute in a purge conflicts historical task definition * that specifies the base dn related to the synchonization domain to purge. */ public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN = NAME_PREFIX_TASK + "purge-conflicts-historical-domain-dn"; /** * The name of the attribute in a purge conflicts historical task definition * that specifies the maximum duration of the task. */ public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION = NAME_PREFIX_TASK + "purge-conflicts-historical-maximum-duration"; /** * The name of the attribute in a purge conflicts historical task definition * that specifies the maximum duration of the task. */ public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CN = NAME_PREFIX_TASK + "purge-conflicts-historical-first-purged-changenumber"; /** * The name of the attribute in a purge conflicts historical task definition * that specifies the maximum duration of the task. */ public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CN = NAME_PREFIX_TASK + "purge-conflicts-historical-last-purged-changenumber"; /** * The name of the attribute in a purge conflicts historical task definition * that specifies the maximum duration of the task. */ public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME = NAME_PREFIX_TASK + "purge-conflicts-historical-purge-completed-in-time"; /** * The name of the attribute in a purge conflicts historical task definition * that specifies the maximum duration of the task. */ public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT = NAME_PREFIX_TASK + "purge-conflicts-historical-purged-values-count"; } opends/src/server/org/opends/server/replication/plugin/EntryHistorical.java
@@ -46,6 +46,7 @@ import org.opends.server.types.operation.PreOperationAddOperation; import org.opends.server.types.operation.PreOperationModifyDNOperation; import org.opends.server.types.operation.PreOperationModifyOperation; import org.opends.server.util.TimeThread; /** * This class is used to store historical information that is @@ -83,6 +84,33 @@ */ public static final String ENTRYUIDNAME = "entryuuid"; /* The delay to purge the historical informations * This delay indicates the time the domain keeps the historical * information necessary to solve conflicts.When a change stored in the * historical part of the user entry has a date (from its replication * ChangeNumber) older than this delay, it is candidate to be purged. * The purge is triggered on 2 events: modify of the entry, dedicated purge * task. * * The purge is done when the historical is encoded. */ private long purgeDelayInMillisec = -1; /* * The oldest ChangeNumber stored in this entry historical attribute. * null when this historical object has been created from * an entry that has no historical attribute and after the last * historical has been purged. */ private ChangeNumber oldestChangeNumber = null; /** * For stats/monitoring purpose, the number of historical values * purged the last time a purge has been applied on this entry historical. */ private int lastPurgedValuesCount = 0; /** * The in-memory historical information is made of. * @@ -140,7 +168,7 @@ public String toString() { StringBuilder builder = new StringBuilder(); builder.append(encode()); builder.append(encodeAndPurge()); return builder.toString(); } @@ -224,7 +252,7 @@ // // - add the modification of the ds-sync-hist attribute, // to the current modifications of the MOD operation Attribute attr = encode(); Attribute attr = encodeAndPurge(); Modification mod = new Modification(ModificationType.REPLACE, attr); mods.add(mod); // - update the already modified entry @@ -251,7 +279,7 @@ Entry modifiedEntry = modifyDNOperation.getUpdatedEntry(); List<Modification> mods = modifyDNOperation.getModifications(); Attribute attr = encode(); Attribute attr = encodeAndPurge(); // Now do the 2 updates required by the core to be consistent: // @@ -394,20 +422,40 @@ } /** * Encode this historical information object in an operational attribute. * For stats/monitoring purpose, returns the number of historical values * purged the last time a purge has been applied on this entry historical. * * @return the purged values count. */ public int getLastPurgedValuesCount() { return this.lastPurgedValuesCount; } /** * Encode this historical information object in an operational attribute * and purge it from the values older than the purge delay. * * @return The historical information encoded in an operational attribute. */ public Attribute encode() public Attribute encodeAndPurge() { long purgeDate = 0; // Set the stats counter to 0 and compute the purgeDate to now minus // the potentially set purge delay. this.lastPurgedValuesCount = 0; if (purgeDelayInMillisec>0) purgeDate = TimeThread.getTime() - purgeDelayInMillisec; AttributeType historicalAttrType = DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME); AttributeBuilder builder = new AttributeBuilder(historicalAttrType); // Encode the historical information for modify operation. for (Map.Entry<AttributeType, AttrHistoricalWithOptions> entryWithOptions : attributesHistorical.entrySet()) { // Encode an attribute type AttributeType type = entryWithOptions.getKey(); HashMap<Set<String> , AttrHistorical> attrwithoptions = entryWithOptions.getValue().getAttributesInfo(); @@ -415,10 +463,11 @@ for (Map.Entry<Set<String>, AttrHistorical> entry : attrwithoptions.entrySet()) { // Encode an (attribute type/option) boolean delAttr = false; Set<String> options = entry.getKey(); String optionsString = ""; AttrHistorical info = entry.getValue(); AttrHistorical attrHist = entry.getValue(); if (options != null) @@ -432,49 +481,63 @@ optionsString = optionsBuilder.toString(); } ChangeNumber deleteTime = info.getDeleteTime(); ChangeNumber deleteTime = attrHist.getDeleteTime(); /* generate the historical information for deleted attributes */ if (deleteTime != null) { delAttr = true; } /* generate the historical information for modified attribute values */ for (AttrValueHistorical valInfo : info.getValuesHistorical()) for (AttrValueHistorical attrValHist : attrHist.getValuesHistorical()) { // Encode an attribute value String strValue; if (valInfo.getValueDeleteTime() != null) if (attrValHist.getValueDeleteTime() != null) { // this hist must be purged now, so skip its encoding if ((purgeDelayInMillisec>0) && (attrValHist.getValueDeleteTime().getTime()<=purgeDate)) { this.lastPurgedValuesCount++; continue; } strValue = type.getNormalizedPrimaryName() + optionsString + ":" + valInfo.getValueDeleteTime().toString() + ":del:" + valInfo.getAttributeValue().toString(); attrValHist.getValueDeleteTime().toString() + ":del:" + attrValHist.getAttributeValue().toString(); AttributeValue val = AttributeValues.create(historicalAttrType, strValue); builder.add(val); } else if (valInfo.getValueUpdateTime() != null) else if (attrValHist.getValueUpdateTime() != null) { if ((delAttr && valInfo.getValueUpdateTime() == deleteTime) && (valInfo.getAttributeValue() != null)) if ((purgeDelayInMillisec>0) && (attrValHist.getValueUpdateTime().getTime()<=purgeDate)) { // this hist must be purged now, so skip its encoding this.lastPurgedValuesCount++; continue; } if ((delAttr && attrValHist.getValueUpdateTime() == deleteTime) && (attrValHist.getAttributeValue() != null)) { strValue = type.getNormalizedPrimaryName() + optionsString + ":" + valInfo.getValueUpdateTime().toString() + ":repl:" + valInfo.getAttributeValue().toString(); attrValHist.getValueUpdateTime().toString() + ":repl:" + attrValHist.getAttributeValue().toString(); delAttr = false; } else { if (valInfo.getAttributeValue() == null) if (attrValHist.getAttributeValue() == null) { strValue = type.getNormalizedPrimaryName() + optionsString + ":" + valInfo.getValueUpdateTime().toString() + + ":" + attrValHist.getValueUpdateTime().toString() + ":add"; } else { strValue = type.getNormalizedPrimaryName() + optionsString + ":" + valInfo.getValueUpdateTime().toString() + ":add:" + valInfo.getAttributeValue().toString(); + ":" + attrValHist.getValueUpdateTime().toString() + ":add:" + attrValHist.getAttributeValue().toString(); } } @@ -486,6 +549,14 @@ if (delAttr) { // Stores the attr deletion hist when not older than the purge delay if ((purgeDelayInMillisec>0) && (deleteTime.getTime()<=purgeDate)) { // this hist must be purged now, so skip its encoding this.lastPurgedValuesCount++; continue; } String strValue = type.getNormalizedPrimaryName() + optionsString + ":" + deleteTime.toString() + ":attrDel"; @@ -499,19 +570,48 @@ // Encode the historical information for the ADD Operation. if (entryADDDate != null) { builder.add(encodeAddHistorical(entryADDDate)); // Stores the ADDDate when not older than the purge delay if ((purgeDelayInMillisec>0) && (entryADDDate.getTime()<=purgeDate)) { this.lastPurgedValuesCount++; } else { builder.add(encodeAddHistorical(entryADDDate)); } } // Encode the historical information for the MODDN Operation. if (entryMODDNDate != null) { builder.add(encodeMODDNHistorical(entryMODDNDate)); // Stores the MODDNDate when not older than the purge delay if ((purgeDelayInMillisec>0) && (entryMODDNDate.getTime()<=purgeDate)) { this.lastPurgedValuesCount++; } else { builder.add(encodeMODDNHistorical(entryMODDNDate)); } } return builder.toAttribute(); } /** * Set the delay to purge the historical informations. The purge is applied * only when historical attribute is updated (write operations). * * @param purgeDelay the purge delay in ms */ public void setPurgeDelay(long purgeDelay) { this.purgeDelayInMillisec = purgeDelay; } /** * Indicates if the Entry was renamed or added after the ChangeNumber * that is given as a parameter. * @@ -602,6 +702,9 @@ AttributeValue value = histVal.getAttributeValue(); HistAttrModificationKey histKey = histVal.getHistKey(); // update the oldest ChangeNumber stored in the new entry historical newHistorical.updateOldestCN(cn); if (histVal.isADDOperation()) { newHistorical.entryADDDate = cn; @@ -834,5 +937,36 @@ return attrType.getNameOrOID().equals(EntryHistorical.HISTORICALATTRIBUTENAME); } /** * Potentially update the oldest ChangeNumber stored in this entry historical * with the provided ChangeNumber when its older than the current oldest. * * @param cn the provided ChangeNumber. */ private void updateOldestCN(ChangeNumber cn) { if (cn != null) { if (this.oldestChangeNumber == null) this.oldestChangeNumber = cn; else if (cn.older(this.oldestChangeNumber)) this.oldestChangeNumber = cn; } } /** * Returns the oldest ChangeNumber stored in this entry historical attribute. * * @return the oldest ChangeNumber stored in this entry historical attribute. * Returns null when this historical object has been created from * an entry that has no historical attribute and after the last * historical has been purged. */ public ChangeNumber getOldestCN() { return this.oldestChangeNumber; } } opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -24,6 +24,7 @@ * * Copyright 2006-2010 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import static org.opends.messages.ReplicationMessages.*; @@ -62,9 +63,13 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.DataFormatException; import java.util.Date; import org.opends.server.util.TimeThread; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.messages.Severity; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*; import org.opends.server.admin.std.server.ExternalChangelogDomainCfg; @@ -162,6 +167,7 @@ import org.opends.server.util.LDIFReader; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import org.opends.server.workflowelement.localbackend.*; import org.opends.server.tasks.PurgeConflictsHistoricalTask; /** * This class implements the bulk part of the.of the Directory Server side @@ -239,7 +245,7 @@ */ public static final String DS_SYNC_CONFLICT = "ds-sync-conflict"; /** /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); @@ -408,6 +414,18 @@ // ChangeNumbers. private boolean logChangeNumber = false; // This configuration integer indicates the time the domain keeps the // historical information necessary to solve conflicts. // When a change stored in the historical part of the user entry has a date // (from its replication ChangeNumber) older than this delay, it is candidate // to be purged. private long histPurgeDelayInMilliSec = 0; // The last change number purged in this domain. Allows to have a continuous // purging process from one purge processing (task run) to the next one. // Values 0 when the server starts. private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0); /** * The thread that periodically saves the ServerState of this * LDAPReplicationDomain in the database. @@ -547,14 +565,17 @@ // Read the configuration parameters. Set<String> replicationServers = configuration.getReplicationServer(); serverId = configuration.getServerId(); baseDn = configuration.getBaseDN(); this.serverId = configuration.getServerId(); this.baseDn = configuration.getBaseDN(); int window = configuration.getWindowSize(); heartbeatInterval = configuration.getHeartbeatInterval(); isolationpolicy = configuration.getIsolationPolicy(); configDn = configuration.dn(); logChangeNumber = configuration.isLogChangenumber(); this.isolationpolicy = configuration.getIsolationPolicy(); this.configDn = configuration.dn(); this.logChangeNumber = configuration.isLogChangenumber(); this.updateToReplayQueue = updateToReplayQueue; this.histPurgeDelayInMilliSec = configuration.getConflictsHistoricalPurgeDelay()*60*1000; // Get assured configuration readAssuredConfig(configuration, false); @@ -2423,6 +2444,12 @@ * Check if the operation that just happened has cleared a conflict : * Clearing a conflict happens if the operation has free a DN that * for which an other entry was in conflict. * Steps: * - get the DN freed by a DELETE or MODRDN op * - search for entries put in the conflict space (dn=entryuid'+'....) * because the expected DN was not available (ds-sync-conflict=expected DN) * - retain the entry with the oldest conflict * - rename this entry with the freedDN as it was expected originally */ private void checkForClearedConflict(PostOperationOperation op) { @@ -2433,14 +2460,14 @@ return; } DN targetDN; DN freedDN; if (type == OperationType.DELETE) { targetDN = ((PostOperationDeleteOperation) op).getEntryDN(); freedDN = ((PostOperationDeleteOperation) op).getEntryDN(); } else if (type == OperationType.MODIFY_DN) { targetDN = ((PostOperationModifyDNOperation) op).getEntryDN(); freedDN = ((PostOperationModifyDNOperation) op).getEntryDN(); } else { @@ -2451,7 +2478,7 @@ try { filter = LDAPFilter.decode( DS_SYNC_CONFLICT + "=" + targetDN.toNormalizedString()); DS_SYNC_CONFLICT + "=" + freedDN.toNormalizedString()); } catch (LDAPException e) { // Not possible. We know the filter just above is correct. @@ -2491,7 +2518,7 @@ { DN entryDN = entrytoRename.getDN(); ModifyDNOperationBasis newOp = renameEntry( entryDN, targetDN.getRDN(), targetDN.getParent(), false); entryDN, freedDN.getRDN(), freedDN.getParent(), false); ResultCode res = newOp.getResultCode(); if (res != ResultCode.SUCCESS) @@ -4410,6 +4437,8 @@ { isolationpolicy = configuration.getIsolationPolicy(); logChangeNumber = configuration.isLogChangenumber(); histPurgeDelayInMilliSec = configuration.getConflictsHistoricalPurgeDelay()*60*1000; changeConfig( configuration.getReplicationServer(), @@ -5588,4 +5617,122 @@ { return this.eclDomain.isEnabled(); } /** * Return the purge delay (in ms) for the historical information stored * in entries to solve conflicts for this domain. * * @return the purge delay. */ public long getHistoricalPurgeDelay() { return histPurgeDelayInMilliSec; } /** * Check if the operation that just happened has cleared a conflict : * Clearing a conflict happens if the operation has free a DN that * for which an other entry was in conflict. * Steps: * - get the DN freed by a DELETE or MODRDN op * - search for entries put in the conflict space (dn=entryuid'+'....) * because the expected DN was not available (ds-sync-conflict=expected DN) * - retain the entry with the oldest conflict * - rename this entry with the freedDN as it was expected originally * * @param task the task raising this purge. * @param endDate the date to stop this task whether the job is done or not. * @throws DirectoryException when an exception happens. * */ public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task, long endDate) throws DirectoryException { LDAPFilter filter = null; TRACER.debugInfo("[PURGE] purgeConflictsHistorical " + "on domain: " + baseDn + "endDate:" + new Date(endDate) + "lastChangeNumberPurgedFromHist: " + lastChangeNumberPurgedFromHist.toStringUI()); try { filter = LDAPFilter.decode( "(" + EntryHistorical.HISTORICALATTRIBUTENAME + ">=dummy:" + lastChangeNumberPurgedFromHist + ")"); } catch (LDAPException e) { // Not possible. We know the filter just above is correct. } LinkedHashSet<String> attrs = new LinkedHashSet<String>(1); attrs.add(EntryHistorical.HISTORICALATTRIBUTENAME); attrs.add(EntryHistorical.ENTRYUIDNAME); attrs.add("*"); InternalSearchOperation searchOp = conn.processSearch( ByteString.valueOf(baseDn.toString()), SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, 0, 0, false, filter, attrs, null); int count = 0; task.setProgressStats(lastChangeNumberPurgedFromHist, count); LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries(); for (SearchResultEntry entry : entries) { long maxTimeToRun = endDate - TimeThread.getTime(); if (maxTimeToRun<0) { Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE, " end date reached"); DirectoryException de = new DirectoryException( ResultCode.ADMIN_LIMIT_EXCEEDED, errMsg); throw (de); } EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry); lastChangeNumberPurgedFromHist = entryHist.getOldestCN(); entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec); Attribute attr = entryHist.encodeAndPurge(); count += entryHist.getLastPurgedValuesCount(); List<Modification> mods = new LinkedList<Modification>(); Modification mod; mod = new Modification(ModificationType.REPLACE, attr); mods.add(mod); ModifyOperationBasis newOp = new ModifyOperationBasis( conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), new ArrayList<Control>(0), entry.getDN(), mods); newOp.setInternalOperation(true); newOp.setSynchronizationOperation(true); newOp.setDontSynchronize(true); newOp.run(); if (newOp.getResultCode() != ResultCode.SUCCESS) { // Log information for the repair tool. MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get()); mb.append(String.valueOf(newOp)); mb.append(" "); mb.append(String.valueOf(newOp.getResultCode())); logError(mb.toMessage()); } else { task.setProgressStats(lastChangeNumberPurgedFromHist, count); } } } } opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -503,6 +503,8 @@ historicalInformation); } historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); historicalInformation.setHistoricalAttrToOperation(modifyOperation); if (modifyOperation.getModifications().isEmpty()) @@ -556,6 +558,8 @@ historicalInformation); } historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); // Add to the operation the historical attribute : "dn:changeNumger:moddn" historicalInformation.setHistoricalAttrToOperation(modifyDNOperation); opends/src/server/org/opends/server/tasks/PurgeConflictsHistoricalTask.java
New file @@ -0,0 +1,286 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2006-2010 Sun Microsystems, Inc. */ package org.opends.server.tasks; import org.opends.server.replication.plugin.LDAPReplicationDomain; import org.opends.server.types.ResultCode; import org.opends.messages.MessageBuilder; import org.opends.messages.Message; import org.opends.messages.Category; import org.opends.messages.Severity; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.core.DirectoryServer.getAttributeType; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import java.util.List; import org.opends.messages.TaskMessages; import org.opends.server.backends.task.Task; import org.opends.server.backends.task.TaskState; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.util.TimeThread; import org.opends.server.replication.common.ChangeNumber; /** * This class provides an implementation of a Directory Server task that can * be used to purge the replication historical informations stored in the * user entries to solve conflicts. */ public class PurgeConflictsHistoricalTask extends Task { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private String domainString = null; private LDAPReplicationDomain domain = null; // The last changeNumber purged : help the user to know how well the purge // processing has done its job. // We want to help the user know: // - the task has started at dateX time, will end at dateY max // and is currently purging dateZ long currentCNPurgedDate = 0; long taskMaxEndDate = 0; /** * current historical purge delay * <---------------------------------> * -----------------------------------------------------------------> t * | | | * current task task * CN being purged start date max end date * <------------> * config.purgeMaxDuration * * The task will start purging the change with the oldest CN found. * The task run as long as : * - the end date (computed from the configured max duration) is not reached * - the CN purged is oldest than the configured historical purge delay * * */ long purgeTaskMaxDurationInSec = 3600; // Default:1h TaskState initState; private static final void debugInfo(String s) { if (debugEnabled()) { System.out.println(Message.raw(Category.SYNC, Severity.NOTICE, s)); TRACER.debugInfo(s); } } /** * {@inheritDoc} */ public Message getDisplayName() { return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get(); } /** * {@inheritDoc} */ @Override public void initializeTask() throws DirectoryException { if (TaskState.isDone(getTaskState())) { return; } // FIXME -- Do we need any special authorization here? Entry taskEntry = getTaskEntry(); AttributeType typeDomainBase; typeDomainBase = getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN, true); List<Attribute> attrList; attrList = taskEntry.getAttribute(typeDomainBase); domainString = TaskUtils.getSingleValueString(attrList); try { DN dn = DN.decode(domainString); // We can assume that this is an LDAP replication domain domain = LDAPReplicationDomain.retrievesReplicationDomain(dn); } catch(DirectoryException e) { MessageBuilder mb = new MessageBuilder(); mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get()); mb.append(e.getMessage()); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, mb.toMessage()); } AttributeType typeMaxDuration; typeMaxDuration = getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, true); attrList = taskEntry.getAttribute(typeMaxDuration); String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList); if (maxDurationStringInSec != null) { try { purgeTaskMaxDurationInSec = Long.decode(maxDurationStringInSec); } catch(Exception e) { throw new DirectoryException( ResultCode.UNWILLING_TO_PERFORM, TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get( ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage())); } } } /** * {@inheritDoc} */ protected TaskState runTask() { Boolean purgeCompletedInTime = false; if (debugEnabled()) { debugInfo("[PURGE] PurgeConflictsHistoricalTask is starting " + "on domain: " + domain.getServiceID() + "max duration (sec):" + purgeTaskMaxDurationInSec); } try { replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); // launch the task domain.purgeConflictsHistorical(this, TimeThread.getTime() + (purgeTaskMaxDurationInSec*1000)); purgeCompletedInTime = true; replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); initState = TaskState.COMPLETED_SUCCESSFULLY; } catch(DirectoryException de) { debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage()); if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED) { // Error raised at submission time logError(de.getMessageObject()); initState = TaskState.STOPPED_BY_ERROR; } else { initState = TaskState.COMPLETED_SUCCESSFULLY; } } finally { try { // sets in the attributes the last stats values replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(this.purgeCount)); replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CN, this.lastCN.toStringUI()); debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs "); } catch(Exception e) { debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + e.getLocalizedMessage()); initState = TaskState.STOPPED_BY_ERROR; } } if (debugEnabled()) { debugInfo("[PURGE] PurgeConflictsHistoricalTask is ending " + "with state:" + initState.toString() + " completedInTime:" + purgeCompletedInTime); } return initState; } int updateAttrPeriod = 0; ChangeNumber lastCN; int purgeCount; /** * Set the last changenumber purged and the count of purged values in order * to monitor the historical purge. * @param lastCN the last changeNumber purged. * @param purgeCount the count of purged values. */ public void setProgressStats(ChangeNumber lastCN, int purgeCount) { try { if (purgeCount == 0) replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CN, lastCN.toStringUI()); // we don't want the update of the task to overload too much task duration this.purgeCount = purgeCount; this.lastCN = lastCN; if (++updateAttrPeriod % 100 == 0) { replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CN, lastCN.toStringUI()); debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs " + purgeCount); } } catch(DirectoryException de) { debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage()); initState = TaskState.STOPPED_BY_ERROR; } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -492,4 +492,18 @@ { return true; } /** * Gets the "conflicts-historical-purge-delay" property. * <p> * This delay indicates the time (in minutes) the domain keeps the * historical information necessary to solve conflicts. * * @return Returns the value of the "conflicts-historical-purge-delay" property. **/ public long getConflictsHistoricalPurgeDelay() { return 1440; } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -34,28 +34,44 @@ import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.schema.DirectoryStringSyntax; import org.opends.server.TestCaseUtils; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPFilter; import org.opends.server.tools.LDAPModify; import org.opends.server.types.AbstractOperation; import org.opends.server.types.Attributes; import org.opends.server.types.ByteString; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.Attribute; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.AttributeType; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; import org.opends.server.types.operation.PluginOperation; import org.opends.server.backends.task.TaskBackend; import org.opends.server.backends.task.TaskState; import org.opends.server.core.DirectoryServer; import org.testng.annotations.Test; import org.testng.annotations.BeforeClass; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.opends.messages.ReplicationMessages.ERR_NO_MATCHING_DOMAIN; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.config.ConfigConstants.ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT; import static org.opends.server.config.ConfigConstants.ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME; import java.net.ServerSocket; import java.util.LinkedList; import java.util.List; import java.util.ArrayList; import java.util.UUID; /** * Tests the Historical class. @@ -64,6 +80,7 @@ extends ReplicationTestCase { private int replServerPort; String testName = "historicalTest"; /** * Set up replication on the test backend. @@ -96,7 +113,6 @@ replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif); // The suffix to be synchronized. String testName = "historicalTest"; String synchroServerStringDN = "cn=" + testName + ", cn=domains, " + SYNCHRO_PLUGIN_DN; String synchroServerLdif = "dn: " + synchroServerStringDN + "\n" @@ -115,66 +131,68 @@ /** * Tests that the attribute modification history is correctly read from * and written to an operational attribute of the entry. * Also test that historical is purged according to the purge delay that * is provided. * @throws Exception If the test fails. */ @Test public void testEncoding() @Test(enabled=true) public void testEncodingAndPurge() throws Exception { // Add a test entry. TestCaseUtils.addEntry( "dn: uid=user.1," + TEST_ROOT_DN_STRING, "objectClass: top", "objectClass: person", "objectClass: organizationalPerson", "objectClass: inetOrgPerson", "uid: user.1", "cn: Aaccf Amar", "sn: Amar", "givenName: Aaccf", "userPassword: password", "description: Initial description", "displayName: 1" ); "dn: uid=user.1," + TEST_ROOT_DN_STRING, "objectClass: top", "objectClass: person", "objectClass: organizationalPerson", "objectClass: inetOrgPerson", "uid: user.1", "cn: Aaccf Amar", "sn: Amar", "givenName: Aaccf", "userPassword: password", "description: Initial description", "displayName: 1" ); // Modify the test entry to give it some history. // Test both single and multi-valued attributes. String path = TestCaseUtils.createTempFile( "dn: uid=user.1," + TEST_ROOT_DN_STRING, "changetype: modify", "add: cn;lang-en", "cn;lang-en: Aaccf Amar", "cn;lang-en: Aaccf A Amar", "-", "replace: description", "description: replaced description", "-", "add: displayName", "displayName: 2", "-", "delete: displayName", "displayName: 1", "-" "dn: uid=user.1," + TEST_ROOT_DN_STRING, "changetype: modify", "add: cn;lang-en", "cn;lang-en: Aaccf Amar", "cn;lang-en: Aaccf A Amar", "-", "replace: description", "description: replaced description", "-", "add: displayName", "displayName: 2", "-", "delete: displayName", "displayName: 1", "-" ); String[] args = { "-h", "127.0.0.1", "-p", String.valueOf(TestCaseUtils.getServerLdapPort()), "-D", "cn=Directory Manager", "-w", "password", "-f", path "-h", "127.0.0.1", "-p", String.valueOf(TestCaseUtils.getServerLdapPort()), "-D", "cn=Directory Manager", "-w", "password", "-f", path }; assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0); args[9] = TestCaseUtils.createTempFile( "dn: uid=user.1," + TEST_ROOT_DN_STRING, "changetype: modify", "replace: displayName", "displayName: 2", "-" "dn: uid=user.1," + TEST_ROOT_DN_STRING, "changetype: modify", "replace: displayName", "displayName: 2", "-" ); assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0); @@ -188,9 +206,47 @@ // Check that encoding and decoding preserves the history information. EntryHistorical hist = EntryHistorical.newInstanceFromEntry(entry); Attribute after = hist.encode(); Attribute after = hist.encodeAndPurge(); assertEquals(hist.getLastPurgedValuesCount(),0); assertEquals(after, before); LDAPReplicationDomain domain = MultimasterReplication.findDomain( DN.decode("uid=user.1," + TEST_ROOT_DN_STRING), null); Thread.sleep(1000); args[9] = TestCaseUtils.createTempFile( "dn: uid=user.1," + TEST_ROOT_DN_STRING, "changetype: modify", "replace: displayName", "displayName: 3", "-" ); assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0); long testPurgeDelayInMillisec = 1000; // 1 sec // Read the entry back to get its history operational attribute. entry = DirectoryServer.getEntry(dn); hist = EntryHistorical.newInstanceFromEntry(entry); hist.setPurgeDelay(testPurgeDelayInMillisec); after = hist.encodeAndPurge(); // The purge time is not done so the hist attribute should be not empty assertTrue(!after.isEmpty()); // Now wait for the purge time to be done Thread.sleep(testPurgeDelayInMillisec); // Read the entry back to get its history operational attribute. // The hist attribute should now be empty since purged entry = DirectoryServer.getEntry(dn); hist = EntryHistorical.newInstanceFromEntry(entry); hist.setPurgeDelay(testPurgeDelayInMillisec); after = hist.encodeAndPurge(); assertTrue(after.isEmpty()); assertEquals(hist.getLastPurgedValuesCount(),11); } /** @@ -263,11 +319,12 @@ String entryuuid2 = attrs.get(0).iterator().next().getValue().toString(); long now = System.currentTimeMillis(); // A change on a first server. ChangeNumber t1 = new ChangeNumber(1, 0, 3); ChangeNumber t1 = new ChangeNumber(now, 0, 3); // A change on a second server. ChangeNumber t2 = new ChangeNumber(2, 0, 4); ChangeNumber t2 = new ChangeNumber(now+1, 0, 4); // Simulate the ordering t1:add:A followed by t2:add:B that would // happen on one server. @@ -294,8 +351,8 @@ // Simulate the reverse ordering t2:add:B followed by t1:add:A that // would happen on the other server. t1 = new ChangeNumber(3, 0, 3); t2 = new ChangeNumber(4, 0, 4); t1 = new ChangeNumber(now+3, 0, 3); t2 = new ChangeNumber(now+4, 0, 4); // Replay an add of a value B at time t2 on a second server. attr = Attributes.create(attrType.getNormalizedPrimaryName(), "B"); @@ -326,7 +383,10 @@ // The two values should be the first value added. assertEquals(attrValue1, "A"); assertEquals(attrValue2, "A"); } TestCaseUtils.deleteEntry(DN.decode("cn=test1," + TEST_ROOT_DN_STRING)); TestCaseUtils.deleteEntry(DN.decode("cn=test2," + TEST_ROOT_DN_STRING)); } private static void publishModify(ReplicationBroker broker, ChangeNumber changeNum, @@ -431,7 +491,9 @@ } /** * * Performs a few check on the provided ADD operations, particularly * that a ADDmsg can be created from it with valid values for fields * DN, entryuid, ...) */ private void assertFakeOperations(final DN dn1, Entry entry, Iterable<FakeOperation> ops, int assertCount) throws Exception @@ -467,4 +529,175 @@ assertEquals(count, assertCount); } /** * Test the task that purges the replication historical stored in the user * entry. * Steps : * - creates entry containing historical * - wait for the pruge delay * - lauch the purge task * - verify that all historical has been purged * * TODO: another test should be written that configures the task no NOT have * the time to purge everything in 1 run .. and thus to relauch it to finish * the purge. And verify that the second run starts on the changeNumber where * the previous task run had stopped. * * @throws Exception If the test fails. */ @Test(enabled=true) public void testRecurringPurgeIn1Run() throws Exception { int entryCnt = 10; addEntriesWithHistorical(1, entryCnt); /* // every entry should have its hist try { // Search for matching entries in config backend InternalSearchOperation op = connection.processSearch( ByteString.valueOf(TEST_ROOT_DN_STRING), SearchScope.WHOLE_SUBTREE, LDAPFilter.decode("(ds-sync-hist=*)")); assertEquals(op.getResultCode(), ResultCode.SUCCESS, op.getErrorMessage().toString()); // Check that no entries have been found LinkedList<SearchResultEntry> entries = op.getSearchEntries(); assertTrue(entries != null); assertEquals(entries.size(), entryCnt); } catch (Exception e) { fail("assertNoConfigEntriesWithFilter: could not search config backend" + e.getMessage()); } */ // set the purge delay to 1 sec TestCaseUtils.dsconfig( "set-replication-domain-prop", "--provider-name","Multimaster Synchronization", "--domain-name",testName, "--set","conflicts-historical-purge-delay:1m"); Thread.sleep(60*1000); // launch the purge Entry taskInit = TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-purge-conflicts-historical", "ds-task-class-name: org.opends.server.tasks.PurgeConflictsHistoricalTask", "ds-task-purge-conflicts-historical-domain-dn: "+TEST_ROOT_DN_STRING, "ds-task-purge-conflicts-historical-maximum-duration: 1000"); // 1000 sec addTask(taskInit, ResultCode.SUCCESS, null); // every entry should be purged from its hist try { // Search for matching entries in config backend InternalSearchOperation op = connection.processSearch( ByteString.valueOf(TEST_ROOT_DN_STRING), SearchScope.WHOLE_SUBTREE, LDAPFilter.decode("(ds-sync-hist=*)")); assertEquals(op.getResultCode(), ResultCode.SUCCESS, op.getErrorMessage().toString()); // Check that no entries have been found LinkedList<SearchResultEntry> entries = op.getSearchEntries(); assertTrue(entries != null); assertEquals(entries.size(), 0); } catch (Exception e) { fail("assertNoConfigEntriesWithFilter: could not search config backend" + e.getMessage()); } } /** * Add a provided number of generated entries containing historical. * @param dnSuffix A suffix to be added to the dn * @param entryCnt The number of entries to create * @throws Exception */ private void addEntriesWithHistorical(int dnSuffix, int entryCnt) throws Exception { for (int i=0; i<entryCnt;i++) { String sdn = "dn: uid=user"+i+dnSuffix+"," + TEST_ROOT_DN_STRING; // Add a test entry. TestCaseUtils.addEntry( sdn, "objectClass: top", "objectClass: person", "objectClass: organizationalPerson", "objectClass: inetOrgPerson", "uid: user"+i, "cn: Aaccf Amar", "sn: Amar", "givenName: Aaccf", "userPassword: password", "description: Initial description", "displayName: 1" ); // Modify the test entry to give it some history. // Test both single and multi-valued attributes. String path = TestCaseUtils.createTempFile( sdn, "changetype: modify", "add: cn;lang-en", "cn;lang-en: Aaccf Amar", "cn;lang-en: Aaccf A Amar", "-", "replace: givenName", "givenName: new given", "-", "replace: userPassword", "userPassword: new pass", "-", "replace: description", "description: replaced description", "-", "replace: sn", "sn: replaced sn", "-", "add: displayName", "displayName: 2", "-", "delete: displayName", "displayName: 1", "-" ); String[] args = { "-h", "127.0.0.1", "-p", String.valueOf(TestCaseUtils.getServerLdapPort()), "-D", "cn=Directory Manager", "-w", "password", "-f", path }; assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0); args[9] = TestCaseUtils.createTempFile( sdn, "changetype: modify", "replace: displayName", "displayName: 2", "-" ); assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0); } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java
@@ -1329,9 +1329,9 @@ * as the initial value. */ entry.removeAttribute(historicalAttrType); entry.addAttribute(hist.encode(), null); entry.addAttribute(hist.encodeAndPurge(), null); EntryHistorical hist2 = EntryHistorical.newInstanceFromEntry(entry); assertEquals(hist2.encode().toString(), hist.encode().toString()); assertEquals(hist2.encodeAndPurge().toString(), hist.encodeAndPurge().toString()); return mods; }