From b9aad30c9e07b179a2c22fad830f6a54b8993bc9 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Wed, 18 Aug 2010 13:27:32 +0000
Subject: [PATCH] First pass on solving issue #514, reducing Replication meta data, especially aged information. The default is to keep the replication meta data (historical information) for at least 1 day. Purging occurs on the fly when entries are modified, or via a task. Launching the task will be available through dsreplication in separate commit. 

---
 opends/resource/schema/02-config.ldif                                                                  |   47 ++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java |    4 
 opends/src/server/org/opends/server/tasks/PurgeConflictsHistoricalTask.java                            |  286 +++++++++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java     |  329 ++++++++++++++--
 opends/src/server/org/opends/server/replication/plugin/EntryHistorical.java                            |  182 ++++++++-
 opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java                     |    4 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java      |   14 
 opends/resource/config/config.ldif                                                                     |    1 
 opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml                   |   22 +
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                      |  169 ++++++++
 opends/src/admin/messages/ReplicationDomainCfgDefn.properties                                          |    1 
 opends/src/server/org/opends/server/config/ConfigConstants.java                                        |   43 ++
 12 files changed, 1,017 insertions(+), 85 deletions(-)

diff --git a/opends/resource/config/config.ldif b/opends/resource/config/config.ldif
index b697a7e..e820ba1 100644
--- a/opends/resource/config/config.ldif
+++ b/opends/resource/config/config.ldif
@@ -65,6 +65,7 @@
 ds-cfg-allowed-task: org.opends.server.tasks.RebuildTask
 ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask
 ds-cfg-allowed-task: org.opends.server.tasks.ShutdownTask
+ds-cfg-allowed-task: org.opends.server.tasks.PurgeConflictsHistoricalTask
 
 dn: cn=Access Control Handler,cn=config
 objectClass: top
diff --git a/opends/resource/schema/02-config.ldif b/opends/resource/schema/02-config.ldif
index 3936d2f..28b8b4d 100644
--- a/opends/resource/schema/02-config.ldif
+++ b/opends/resource/schema/02-config.ldif
@@ -2490,6 +2490,41 @@
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
   SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.614
+  NAME 'ds-cfg-conflicts-historical-purge-delay'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.615
+  NAME 'ds-task-purge-conflicts-historical-domain-dn'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.616
+  NAME 'ds-task-purge-conflicts-historical-maximum-duration'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.617
+  NAME 'ds-task-purge-conflicts-historical-last-purged-changenumber'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.618
+  NAME 'ds-task-purge-conflicts-historical-purge-completed-in-time'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.619
+  NAME 'ds-task-purge-conflicts-historical-purged-values-count'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.620
+  NAME 'ds-task-purge-conflicts-historical-first-purged-changenumber'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+  SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' )
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler'
   SUP top
@@ -3057,6 +3092,7 @@
         ds-cfg-fractional-exclude $
         ds-cfg-fractional-include $
         ds-cfg-solve-conflicts $
+        ds-cfg-conflicts-historical-purge-delay $
         ds-cfg-changetime-heartbeat-interval $
         ds-cfg-log-changenumber $
         ds-cfg-initialization-window-size )
@@ -4196,4 +4232,15 @@
   SUP ds-cfg-virtual-attribute
   STRUCTURAL
   X-ORIGIN 'OpenDS Directory Server' )
+objectClasses: ( 1.3.6.1.4.1.26027.1.2.237
+  NAME 'ds-task-purge-conflicts-historical'
+  SUP ds-task
+  STRUCTURAL
+  MUST ( ds-task-purge-conflicts-historical-domain-dn )
+  MAY ( ds-task-purge-conflicts-historical-maximum-duration $
+        ds-task-purge-conflicts-historical-first-purged-changenumber $
+        ds-task-purge-conflicts-historical-last-purged-changenumber $
+        ds-task-purge-conflicts-historical-purge-completed-in-time $
+        ds-task-purge-conflicts-historical-purged-values-count )
+  X-ORIGIN 'OpenDS Directory Server' )
 
diff --git a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
index 1bc7968..95aafdc 100644
--- a/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
+++ b/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
@@ -536,4 +536,26 @@
       </ldap:attribute>
     </adm:profile>
   </adm:property>
+  <adm:property name="conflicts-historical-purge-delay">
+    <adm:synopsis>
+      This delay indicates the time (in minutes) the domain keeps the historical
+      information necessary to solve conflicts.When a change stored in the 
+      historical part of the user entry has a date (from its replication ChangeNumber)
+      older than this delay, it is candidate to be purged.
+      The purge is applied on 2 events: modify of the entry, dedicated purge task.
+    </adm:synopsis>
+    <adm:default-behavior>
+      <adm:defined>
+        <adm:value>1440m</adm:value>
+      </adm:defined>
+    </adm:default-behavior>
+    <adm:syntax>
+      <adm:duration base-unit="m" allow-unlimited="false" />
+    </adm:syntax>
+    <adm:profile name="ldap">
+      <ldap:attribute>
+        <ldap:name>ds-cfg-conflicts-historical-purge-delay</ldap:name>
+      </ldap:attribute>
+    </adm:profile>
+  </adm:property>
 </adm:managed-object>
diff --git a/opends/src/admin/messages/ReplicationDomainCfgDefn.properties b/opends/src/admin/messages/ReplicationDomainCfgDefn.properties
index 26959a8..00bf9c6 100644
--- a/opends/src/admin/messages/ReplicationDomainCfgDefn.properties
+++ b/opends/src/admin/messages/ReplicationDomainCfgDefn.properties
@@ -13,6 +13,7 @@
 property.base-dn.synopsis=Specifies the base DN of the replicated data.
 property.changetime-heartbeat-interval.synopsis=Specifies the heart-beat interval that the Directory Server will use when sending its local change time to the Replication Server.
 property.changetime-heartbeat-interval.description=The Directory Server sends a regular heart-beat to the Replication within the specified interval. The heart-beat indicates the change time of the Directory Server to the Replication Server.
+property.conflicts-historical-purge-delay.synopsis=This delay indicates the time (in minutes) the domain keeps the historical information necessary to solve conflicts.When a change stored in the historical part of the user entry has a date (from its replication ChangeNumber) older than this delay, it is candidate to be purged. The purge is applied on 2 events: modify of the entry, dedicated purge task.
 property.fractional-exclude.synopsis=Allows to exclude some attributes to replicate to this server.
 property.fractional-exclude.description=If fractional-exclude configuration attribute is used, attributes specified in this attribute will be ignored (not added/modified/deleted) when an operation performed from another directory server is being replayed in the local server. Note that the usage of this configuration attribute is mutually exclusive with the usage of the fractional-include attribute.
 property.fractional-exclude.syntax.string.pattern.synopsis=Defines attribute(s) of one particular class or of all possible classes, to exclude from the replication.
diff --git a/opends/src/server/org/opends/server/config/ConfigConstants.java b/opends/src/server/org/opends/server/config/ConfigConstants.java
index 392e068..7e1b388 100644
--- a/opends/src/server/org/opends/server/config/ConfigConstants.java
+++ b/opends/src/server/org/opends/server/config/ConfigConstants.java
@@ -4370,5 +4370,48 @@
    */
   public static final String ATTR_IMPORT_CLEAR_BACKEND =
        NAME_PREFIX_TASK + "import-clear-backend";
+
+  /**
+   * The name of the attribute in a purge conflicts historical task definition
+   * that specifies the base dn related to the synchonization domain to purge.
+   */
+  public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN =
+       NAME_PREFIX_TASK + "purge-conflicts-historical-domain-dn";
+
+  /**
+   * The name of the attribute in a purge conflicts historical task definition
+   * that specifies the maximum duration of the task.
+   */
+  public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION =
+       NAME_PREFIX_TASK + "purge-conflicts-historical-maximum-duration";
+
+  /**
+   * The name of the attribute in a purge conflicts historical task definition
+   * that specifies the maximum duration of the task.
+   */
+  public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CN =
+     NAME_PREFIX_TASK + "purge-conflicts-historical-first-purged-changenumber";
+
+  /**
+   * The name of the attribute in a purge conflicts historical task definition
+   * that specifies the maximum duration of the task.
+   */
+  public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CN =
+       NAME_PREFIX_TASK + "purge-conflicts-historical-last-purged-changenumber";
+
+  /**
+   * The name of the attribute in a purge conflicts historical task definition
+   * that specifies the maximum duration of the task.
+   */
+  public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME =
+       NAME_PREFIX_TASK + "purge-conflicts-historical-purge-completed-in-time";
+
+  /**
+   * The name of the attribute in a purge conflicts historical task definition
+   * that specifies the maximum duration of the task.
+   */
+  public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT =
+       NAME_PREFIX_TASK + "purge-conflicts-historical-purged-values-count";
+
 }
 
diff --git a/opends/src/server/org/opends/server/replication/plugin/EntryHistorical.java b/opends/src/server/org/opends/server/replication/plugin/EntryHistorical.java
index 408cc1c..8193815 100644
--- a/opends/src/server/org/opends/server/replication/plugin/EntryHistorical.java
+++ b/opends/src/server/org/opends/server/replication/plugin/EntryHistorical.java
@@ -46,6 +46,7 @@
 import org.opends.server.types.operation.PreOperationAddOperation;
 import org.opends.server.types.operation.PreOperationModifyDNOperation;
 import org.opends.server.types.operation.PreOperationModifyOperation;
+import org.opends.server.util.TimeThread;
 
 /**
  * This class is used to store historical information that is
@@ -83,6 +84,33 @@
    */
   public static final String ENTRYUIDNAME = "entryuuid";
 
+  /* The delay to purge the historical informations
+   * This delay indicates the time the domain keeps the historical
+   * information necessary to solve conflicts.When a change stored in the
+   * historical part of the user entry has a date (from its replication
+   * ChangeNumber) older than this delay, it is candidate to be purged.
+   * The purge is triggered on 2 events: modify of the entry, dedicated purge
+   * task.
+   *
+   * The purge is done when the historical is encoded.
+   */
+  private long purgeDelayInMillisec = -1;
+
+  /*
+   * The oldest ChangeNumber stored in this entry historical attribute.
+   * null when this historical object has been created from
+   * an entry that has no historical attribute and after the last
+   * historical has been purged.
+   */
+  private ChangeNumber oldestChangeNumber = null;
+
+  /**
+   * For stats/monitoring purpose, the number of historical values
+   * purged the last time a purge has been applied on this entry historical.
+   */
+  private int lastPurgedValuesCount = 0;
+
+
   /**
    * The in-memory historical information is made of.
    *
@@ -140,7 +168,7 @@
   public String toString()
   {
     StringBuilder builder = new StringBuilder();
-    builder.append(encode());
+    builder.append(encodeAndPurge());
     return builder.toString();
   }
 
@@ -224,7 +252,7 @@
     //
     // - add the modification of the ds-sync-hist attribute,
     // to the current modifications of the MOD operation
-    Attribute attr = encode();
+    Attribute attr = encodeAndPurge();
     Modification mod = new Modification(ModificationType.REPLACE, attr);
     mods.add(mod);
     // - update the already modified entry
@@ -251,7 +279,7 @@
     Entry modifiedEntry = modifyDNOperation.getUpdatedEntry();
     List<Modification> mods = modifyDNOperation.getModifications();
 
-    Attribute attr = encode();
+    Attribute attr = encodeAndPurge();
 
     // Now do the 2 updates required by the core to be consistent:
     //
@@ -394,20 +422,40 @@
   }
 
   /**
-   * Encode this historical information object in an operational attribute.
+   * For stats/monitoring purpose, returns the number of historical values
+   * purged the last time a purge has been applied on this entry historical.
+   *
+   * @return the purged values count.
+   */
+  public int getLastPurgedValuesCount()
+  {
+    return this.lastPurgedValuesCount;
+  }
+
+  /**
+   * Encode this historical information object in an operational attribute
+   * and purge it from the values older than the purge delay.
    *
    * @return The historical information encoded in an operational attribute.
    */
-  public Attribute encode()
+  public Attribute encodeAndPurge()
   {
+    long purgeDate = 0;
+
+    // Set the stats counter to 0 and compute the purgeDate to now minus
+    // the potentially set purge delay.
+    this.lastPurgedValuesCount = 0;
+    if (purgeDelayInMillisec>0)
+      purgeDate = TimeThread.getTime() - purgeDelayInMillisec;
+
     AttributeType historicalAttrType =
       DirectoryServer.getSchema().getAttributeType(HISTORICALATTRIBUTENAME);
     AttributeBuilder builder = new AttributeBuilder(historicalAttrType);
 
-    // Encode the historical information for modify operation.
     for (Map.Entry<AttributeType, AttrHistoricalWithOptions> entryWithOptions :
           attributesHistorical.entrySet())
     {
+      // Encode an attribute type
       AttributeType type = entryWithOptions.getKey();
       HashMap<Set<String> , AttrHistorical> attrwithoptions =
                                 entryWithOptions.getValue().getAttributesInfo();
@@ -415,10 +463,11 @@
       for (Map.Entry<Set<String>, AttrHistorical> entry :
            attrwithoptions.entrySet())
       {
+        // Encode an (attribute type/option)
         boolean delAttr = false;
         Set<String> options = entry.getKey();
         String optionsString = "";
-        AttrHistorical info = entry.getValue();
+        AttrHistorical attrHist = entry.getValue();
 
 
         if (options != null)
@@ -432,49 +481,63 @@
           optionsString = optionsBuilder.toString();
         }
 
-        ChangeNumber deleteTime = info.getDeleteTime();
+        ChangeNumber deleteTime = attrHist.getDeleteTime();
         /* generate the historical information for deleted attributes */
         if (deleteTime != null)
         {
           delAttr = true;
         }
 
-        /* generate the historical information for modified attribute values */
-        for (AttrValueHistorical valInfo : info.getValuesHistorical())
+        for (AttrValueHistorical attrValHist : attrHist.getValuesHistorical())
         {
+          // Encode an attribute value
           String strValue;
-          if (valInfo.getValueDeleteTime() != null)
+          if (attrValHist.getValueDeleteTime() != null)
           {
+            // this hist must be purged now, so skip its encoding
+            if ((purgeDelayInMillisec>0) &&
+                (attrValHist.getValueDeleteTime().getTime()<=purgeDate))
+            {
+              this.lastPurgedValuesCount++;
+              continue;
+            }
             strValue = type.getNormalizedPrimaryName() + optionsString + ":" +
-            valInfo.getValueDeleteTime().toString() +
-            ":del:" + valInfo.getAttributeValue().toString();
+            attrValHist.getValueDeleteTime().toString() +
+            ":del:" + attrValHist.getAttributeValue().toString();
             AttributeValue val = AttributeValues.create(historicalAttrType,
                                                     strValue);
             builder.add(val);
           }
-          else if (valInfo.getValueUpdateTime() != null)
+          else if (attrValHist.getValueUpdateTime() != null)
           {
-            if ((delAttr && valInfo.getValueUpdateTime() == deleteTime)
-               && (valInfo.getAttributeValue() != null))
+            if ((purgeDelayInMillisec>0) &&
+                (attrValHist.getValueUpdateTime().getTime()<=purgeDate))
+            {
+              // this hist must be purged now, so skip its encoding
+              this.lastPurgedValuesCount++;
+              continue;
+            }
+            if ((delAttr && attrValHist.getValueUpdateTime() == deleteTime)
+               && (attrValHist.getAttributeValue() != null))
             {
               strValue = type.getNormalizedPrimaryName() + optionsString + ":" +
-              valInfo.getValueUpdateTime().toString() +  ":repl:" +
-              valInfo.getAttributeValue().toString();
+              attrValHist.getValueUpdateTime().toString() +  ":repl:" +
+              attrValHist.getAttributeValue().toString();
               delAttr = false;
             }
             else
             {
-              if (valInfo.getAttributeValue() == null)
+              if (attrValHist.getAttributeValue() == null)
               {
                 strValue = type.getNormalizedPrimaryName() + optionsString
-                           + ":" + valInfo.getValueUpdateTime().toString() +
+                           + ":" + attrValHist.getValueUpdateTime().toString() +
                            ":add";
               }
               else
               {
                 strValue = type.getNormalizedPrimaryName() + optionsString
-                           + ":" + valInfo.getValueUpdateTime().toString() +
-                           ":add:" + valInfo.getAttributeValue().toString();
+                           + ":" + attrValHist.getValueUpdateTime().toString() +
+                           ":add:" + attrValHist.getAttributeValue().toString();
               }
             }
 
@@ -486,6 +549,14 @@
 
         if (delAttr)
         {
+          // Stores the attr deletion hist when not older than the purge delay
+          if ((purgeDelayInMillisec>0) &&
+              (deleteTime.getTime()<=purgeDate))
+          {
+            // this hist must be purged now, so skip its encoding
+            this.lastPurgedValuesCount++;
+            continue;
+          }
           String strValue = type.getNormalizedPrimaryName()
               + optionsString + ":" + deleteTime.toString()
               + ":attrDel";
@@ -499,19 +570,48 @@
     // Encode the historical information for the ADD Operation.
     if (entryADDDate != null)
     {
-      builder.add(encodeAddHistorical(entryADDDate));
+      // Stores the ADDDate when not older than the purge delay
+      if ((purgeDelayInMillisec>0) &&
+          (entryADDDate.getTime()<=purgeDate))
+      {
+        this.lastPurgedValuesCount++;
+      }
+      else
+      {
+        builder.add(encodeAddHistorical(entryADDDate));
+      }
     }
 
     // Encode the historical information for the MODDN Operation.
     if (entryMODDNDate != null)
     {
-      builder.add(encodeMODDNHistorical(entryMODDNDate));
+      // Stores the MODDNDate when not older than the purge delay
+      if ((purgeDelayInMillisec>0) &&
+          (entryMODDNDate.getTime()<=purgeDate))
+      {
+        this.lastPurgedValuesCount++;
+      }
+      else
+      {
+        builder.add(encodeMODDNHistorical(entryMODDNDate));
+      }
     }
 
     return builder.toAttribute();
   }
 
   /**
+   * Set the delay to purge the historical informations. The purge is applied
+   * only when historical attribute is updated (write operations).
+   *
+   * @param purgeDelay the purge delay in ms
+   */
+  public void setPurgeDelay(long purgeDelay)
+  {
+    this.purgeDelayInMillisec = purgeDelay;
+  }
+
+  /**
    * Indicates if the Entry was renamed or added after the ChangeNumber
    * that is given as a parameter.
    *
@@ -602,6 +702,9 @@
           AttributeValue value = histVal.getAttributeValue();
           HistAttrModificationKey histKey = histVal.getHistKey();
 
+          // update the oldest ChangeNumber stored in the new entry historical
+          newHistorical.updateOldestCN(cn);
+
           if (histVal.isADDOperation())
           {
             newHistorical.entryADDDate = cn;
@@ -834,5 +937,36 @@
     return
       attrType.getNameOrOID().equals(EntryHistorical.HISTORICALATTRIBUTENAME);
   }
+
+  /**
+   * Potentially update the oldest ChangeNumber stored in this entry historical
+   * with the provided ChangeNumber when its older than the current oldest.
+   *
+   * @param cn the provided ChangeNumber.
+   */
+  private void updateOldestCN(ChangeNumber cn)
+  {
+    if (cn != null)
+    {
+      if (this.oldestChangeNumber == null)
+        this.oldestChangeNumber = cn;
+      else
+        if (cn.older(this.oldestChangeNumber))
+            this.oldestChangeNumber = cn;
+    }
+  }
+
+  /**
+   * Returns the oldest ChangeNumber stored in this entry historical attribute.
+   *
+   * @return the oldest ChangeNumber stored in this entry historical attribute.
+   *         Returns null when this historical object has been created from
+   *         an entry that has no historical attribute and after the last
+   *         historical has been purged.
+   */
+  public ChangeNumber getOldestCN()
+  {
+    return this.oldestChangeNumber;
+  }
 }
 
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 7ad8093..f4bc7a4 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -24,6 +24,7 @@
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
  */
+
 package org.opends.server.replication.plugin;
 
 import static org.opends.messages.ReplicationMessages.*;
@@ -62,9 +63,13 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.DataFormatException;
+import java.util.Date;
 
+import org.opends.server.util.TimeThread;
+import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
 import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
@@ -162,6 +167,7 @@
 import org.opends.server.util.LDIFReader;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 import org.opends.server.workflowelement.localbackend.*;
+import org.opends.server.tasks.PurgeConflictsHistoricalTask;
 
 /**
  *  This class implements the bulk part of the.of the Directory Server side
@@ -239,7 +245,7 @@
    */
   public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
 
-  /**
+ /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
@@ -408,6 +414,18 @@
   // ChangeNumbers.
   private boolean logChangeNumber = false;
 
+  // This configuration integer indicates the time the domain keeps the
+  // historical information necessary to solve conflicts.
+  // When a change stored in the historical part of the user entry has a date
+  // (from its replication ChangeNumber) older than this delay, it is candidate
+  // to be purged.
+  private long histPurgeDelayInMilliSec = 0;
+
+  // The last change number purged in this domain. Allows to have a continuous
+  // purging process from one purge processing (task run) to the next one.
+  // Values 0 when the server starts.
+  private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
+
   /**
    * The thread that periodically saves the ServerState of this
    * LDAPReplicationDomain in the database.
@@ -547,14 +565,17 @@
 
     // Read the configuration parameters.
     Set<String> replicationServers = configuration.getReplicationServer();
-    serverId = configuration.getServerId();
-    baseDn = configuration.getBaseDN();
+
+    this.serverId = configuration.getServerId();
+    this.baseDn = configuration.getBaseDN();
     int window  = configuration.getWindowSize();
     heartbeatInterval = configuration.getHeartbeatInterval();
-    isolationpolicy = configuration.getIsolationPolicy();
-    configDn = configuration.dn();
-    logChangeNumber = configuration.isLogChangenumber();
+    this.isolationpolicy = configuration.getIsolationPolicy();
+    this.configDn = configuration.dn();
+    this.logChangeNumber = configuration.isLogChangenumber();
     this.updateToReplayQueue = updateToReplayQueue;
+    this.histPurgeDelayInMilliSec =
+      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
 
     // Get assured configuration
     readAssuredConfig(configuration, false);
@@ -2423,6 +2444,12 @@
    * Check if the operation that just happened has cleared a conflict :
    * Clearing a conflict happens if the operation has free a DN that
    * for which an other entry was in conflict.
+   * Steps:
+   * - get the DN freed by a DELETE or MODRDN op
+   * - search for entries put in the conflict space (dn=entryuid'+'....)
+   *   because the expected DN was not available (ds-sync-conflict=expected DN)
+   * - retain the entry with the oldest conflict
+   * - rename this entry with the freedDN as it was expected originally
    */
    private void checkForClearedConflict(PostOperationOperation op)
    {
@@ -2433,14 +2460,14 @@
        return;
      }
 
-     DN targetDN;
+     DN freedDN;
      if (type == OperationType.DELETE)
      {
-       targetDN = ((PostOperationDeleteOperation) op).getEntryDN();
+       freedDN = ((PostOperationDeleteOperation) op).getEntryDN();
      }
      else if (type == OperationType.MODIFY_DN)
      {
-       targetDN = ((PostOperationModifyDNOperation) op).getEntryDN();
+       freedDN = ((PostOperationModifyDNOperation) op).getEntryDN();
      }
      else
      {
@@ -2451,7 +2478,7 @@
     try
     {
       filter = LDAPFilter.decode(
-          DS_SYNC_CONFLICT + "=" + targetDN.toNormalizedString());
+          DS_SYNC_CONFLICT + "=" + freedDN.toNormalizedString());
     } catch (LDAPException e)
     {
       // Not possible. We know the filter just above is correct.
@@ -2491,7 +2518,7 @@
      {
        DN entryDN = entrytoRename.getDN();
        ModifyDNOperationBasis newOp = renameEntry(
-           entryDN, targetDN.getRDN(), targetDN.getParent(), false);
+           entryDN, freedDN.getRDN(), freedDN.getParent(), false);
 
        ResultCode res = newOp.getResultCode();
        if (res != ResultCode.SUCCESS)
@@ -4410,6 +4437,8 @@
   {
     isolationpolicy = configuration.getIsolationPolicy();
     logChangeNumber = configuration.isLogChangenumber();
+    histPurgeDelayInMilliSec =
+      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
 
     changeConfig(
         configuration.getReplicationServer(),
@@ -5588,4 +5617,122 @@
   {
     return this.eclDomain.isEnabled();
   }
+
+  /**
+   * Return the purge delay (in ms) for the historical information stored
+   * in entries to solve conflicts for this domain.
+   *
+   * @return the purge delay.
+   */
+  public long getHistoricalPurgeDelay()
+  {
+    return histPurgeDelayInMilliSec;
+  }
+
+  /**
+   * Check if the operation that just happened has cleared a conflict :
+   * Clearing a conflict happens if the operation has free a DN that
+   * for which an other entry was in conflict.
+   * Steps:
+   * - get the DN freed by a DELETE or MODRDN op
+   * - search for entries put in the conflict space (dn=entryuid'+'....)
+   *   because the expected DN was not available (ds-sync-conflict=expected DN)
+   * - retain the entry with the oldest conflict
+   * - rename this entry with the freedDN as it was expected originally
+   *
+   * @param task     the task raising this purge.
+   * @param endDate  the date to stop this task whether the job is done or not.
+   * @throws DirectoryException when an exception happens.
+   *
+   */
+   public void purgeConflictsHistorical(PurgeConflictsHistoricalTask task,
+       long endDate)
+   throws DirectoryException
+   {
+     LDAPFilter filter = null;
+
+     TRACER.debugInfo("[PURGE] purgeConflictsHistorical "
+         + "on domain: " + baseDn
+         + "endDate:" + new Date(endDate)
+         + "lastChangeNumberPurgedFromHist: "
+         + lastChangeNumberPurgedFromHist.toStringUI());
+
+     try
+     {
+       filter = LDAPFilter.decode(
+         "(" + EntryHistorical.HISTORICALATTRIBUTENAME + ">=dummy:"
+         + lastChangeNumberPurgedFromHist + ")");
+
+     } catch (LDAPException e)
+     {
+       // Not possible. We know the filter just above is correct.
+     }
+
+     LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+     attrs.add(EntryHistorical.HISTORICALATTRIBUTENAME);
+     attrs.add(EntryHistorical.ENTRYUIDNAME);
+     attrs.add("*");
+     InternalSearchOperation searchOp =  conn.processSearch(
+         ByteString.valueOf(baseDn.toString()),
+         SearchScope.WHOLE_SUBTREE,
+         DereferencePolicy.NEVER_DEREF_ALIASES,
+         0, 0, false, filter,
+         attrs, null);
+
+     int count = 0;
+     task.setProgressStats(lastChangeNumberPurgedFromHist, count);
+
+     LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
+     for (SearchResultEntry entry : entries)
+     {
+       long maxTimeToRun = endDate - TimeThread.getTime();
+       if (maxTimeToRun<0)
+       {
+         Message errMsg = Message.raw(Category.SYNC, Severity.NOTICE,
+             " end date reached");
+         DirectoryException de = new DirectoryException(
+             ResultCode.ADMIN_LIMIT_EXCEEDED,
+             errMsg);
+         throw (de);
+       }
+
+       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
+       lastChangeNumberPurgedFromHist = entryHist.getOldestCN();
+       entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec);
+       Attribute attr = entryHist.encodeAndPurge();
+       count += entryHist.getLastPurgedValuesCount();
+       List<Modification> mods = new LinkedList<Modification>();
+       Modification mod;
+       mod = new Modification(ModificationType.REPLACE, attr);
+       mods.add(mod);
+
+       ModifyOperationBasis newOp =
+         new ModifyOperationBasis(
+             conn, InternalClientConnection.nextOperationID(),
+             InternalClientConnection.nextMessageID(),
+             new ArrayList<Control>(0),
+             entry.getDN(),
+             mods);
+       newOp.setInternalOperation(true);
+       newOp.setSynchronizationOperation(true);
+       newOp.setDontSynchronize(true);
+
+       newOp.run();
+
+       if (newOp.getResultCode() != ResultCode.SUCCESS)
+       {
+         // Log information for the repair tool.
+         MessageBuilder mb = new MessageBuilder();
+         mb.append(ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE.get());
+         mb.append(String.valueOf(newOp));
+         mb.append(" ");
+         mb.append(String.valueOf(newOp.getResultCode()));
+         logError(mb.toMessage());
+       }
+       else
+       {
+         task.setProgressStats(lastChangeNumberPurgedFromHist, count);
+       }
+     }
+   }
 }
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index ddc8b01..ede7a83 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -503,6 +503,8 @@
           historicalInformation);
     }
 
+    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
+
     historicalInformation.setHistoricalAttrToOperation(modifyOperation);
 
     if (modifyOperation.getModifications().isEmpty())
@@ -556,6 +558,8 @@
           historicalInformation);
     }
 
+    historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay());
+
     // Add to the operation the historical attribute : "dn:changeNumger:moddn"
     historicalInformation.setHistoricalAttrToOperation(modifyDNOperation);
 
diff --git a/opends/src/server/org/opends/server/tasks/PurgeConflictsHistoricalTask.java b/opends/src/server/org/opends/server/tasks/PurgeConflictsHistoricalTask.java
new file mode 100644
index 0000000..f2cb1f0
--- /dev/null
+++ b/opends/src/server/org/opends/server/tasks/PurgeConflictsHistoricalTask.java
@@ -0,0 +1,286 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2006-2010 Sun Microsystems, Inc.
+ */
+package org.opends.server.tasks;
+import org.opends.server.replication.plugin.LDAPReplicationDomain;
+
+import org.opends.server.types.ResultCode;
+
+import org.opends.messages.MessageBuilder;
+
+
+import org.opends.messages.Message;
+import org.opends.messages.Category;
+import org.opends.messages.Severity;
+
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.core.DirectoryServer.getAttributeType;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import java.util.List;
+
+import org.opends.messages.TaskMessages;
+import org.opends.server.backends.task.Task;
+import org.opends.server.backends.task.TaskState;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.util.TimeThread;
+import org.opends.server.replication.common.ChangeNumber;
+
+/**
+ * This class provides an implementation of a Directory Server task that can
+ * be used to purge the replication historical informations stored in the
+ * user entries to solve conflicts.
+ */
+public class PurgeConflictsHistoricalTask extends Task
+{
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
+  private String  domainString = null;
+  private LDAPReplicationDomain domain = null;
+
+  // The last changeNumber purged : help the user to know how well the purge
+  // processing has done its job.
+  // We want to help the user know:
+  // - the task has started at dateX time, will end at dateY max
+  //   and is currently purging dateZ
+  long currentCNPurgedDate = 0;
+  long taskMaxEndDate = 0;
+
+  /**
+   *                 current historical purge delay
+   *                <--------------------------------->
+   * -----------------------------------------------------------------> t
+   *               |                           |            |
+   *           current                      task           task
+   *           CN being purged           start date    max end date
+   *                                           <------------>
+   *                                          config.purgeMaxDuration
+   *
+   * The task will start purging the change with the oldest CN found.
+   * The task run as long as :
+   *  - the end date (computed from the configured max duration) is not reached
+   *  - the CN purged is oldest than the configured historical purge delay
+   *
+   *
+   */
+
+  long purgeTaskMaxDurationInSec = 3600; // Default:1h
+
+  TaskState initState;
+
+
+  private static final void debugInfo(String s)
+  {
+    if (debugEnabled())
+    {
+      System.out.println(Message.raw(Category.SYNC, Severity.NOTICE, s));
+      TRACER.debugInfo(s);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public Message getDisplayName() {
+    return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override public void initializeTask() throws DirectoryException
+  {
+    if (TaskState.isDone(getTaskState()))
+    {
+      return;
+    }
+
+    // FIXME -- Do we need any special authorization here?
+    Entry taskEntry = getTaskEntry();
+
+    AttributeType typeDomainBase;
+    typeDomainBase =
+      getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN, true);
+
+    List<Attribute> attrList;
+    attrList = taskEntry.getAttribute(typeDomainBase);
+    domainString = TaskUtils.getSingleValueString(attrList);
+
+    try
+    {
+      DN dn = DN.decode(domainString);
+      // We can assume that this is an LDAP replication domain
+      domain = LDAPReplicationDomain.retrievesReplicationDomain(dn);
+    }
+    catch(DirectoryException e)
+    {
+      MessageBuilder mb = new MessageBuilder();
+      mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get());
+      mb.append(e.getMessage());
+      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+          mb.toMessage());
+    }
+
+    AttributeType typeMaxDuration;
+    typeMaxDuration =
+      getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, true);
+    attrList = taskEntry.getAttribute(typeMaxDuration);
+    String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList);
+
+    if (maxDurationStringInSec != null)
+    {
+      try
+      {
+        purgeTaskMaxDurationInSec = Long.decode(maxDurationStringInSec);
+      }
+      catch(Exception e)
+      {
+        throw new DirectoryException(
+            ResultCode.UNWILLING_TO_PERFORM,
+            TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get(
+        ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage()));
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  protected TaskState runTask()
+  {
+    Boolean purgeCompletedInTime = false;
+    if (debugEnabled())
+    {
+      debugInfo("[PURGE] PurgeConflictsHistoricalTask is starting "
+          + "on domain: " + domain.getServiceID()
+          + "max duration (sec):" + purgeTaskMaxDurationInSec);
+    }
+    try
+    {
+      replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME,
+          purgeCompletedInTime.toString());
+
+      // launch the task
+      domain.purgeConflictsHistorical(this,
+          TimeThread.getTime() + (purgeTaskMaxDurationInSec*1000));
+
+      purgeCompletedInTime = true;
+      replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME,
+          purgeCompletedInTime.toString());
+
+      initState =  TaskState.COMPLETED_SUCCESSFULLY;
+    }
+    catch(DirectoryException de)
+    {
+      debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " +
+          de.getLocalizedMessage());
+      if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED)
+      {
+        // Error raised at submission time
+        logError(de.getMessageObject());
+        initState = TaskState.STOPPED_BY_ERROR;
+      }
+      else
+      {
+        initState =  TaskState.COMPLETED_SUCCESSFULLY;
+      }
+    }
+    finally
+    {
+      try
+      {
+        // sets in the attributes the last stats values
+        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT,
+            String.valueOf(this.purgeCount));
+        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CN,
+            this.lastCN.toStringUI());
+        debugInfo("[PURGE] PurgeConflictsHistoricalTask write  attrs ");
+      }
+      catch(Exception e)
+      {
+        debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " +
+            e.getLocalizedMessage());
+        initState = TaskState.STOPPED_BY_ERROR;
+      }
+    }
+
+    if (debugEnabled())
+    {
+      debugInfo("[PURGE] PurgeConflictsHistoricalTask is ending " +
+            "with state:" + initState.toString() +
+            " completedInTime:" + purgeCompletedInTime);
+    }
+    return initState;
+  }
+
+  int updateAttrPeriod = 0;
+  ChangeNumber lastCN;
+  int purgeCount;
+
+  /**
+   * Set the last changenumber purged and the count of purged values in order
+   * to monitor the historical purge.
+   * @param lastCN the last changeNumber purged.
+   * @param purgeCount the count of purged values.
+   */
+  public void setProgressStats(ChangeNumber lastCN, int purgeCount)
+  {
+    try
+    {
+      if (purgeCount == 0)
+        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CN,
+            lastCN.toStringUI());
+
+      // we don't want the update of the task to overload too much task duration
+      this.purgeCount = purgeCount;
+      this.lastCN = lastCN;
+      if (++updateAttrPeriod % 100 == 0)
+      {
+        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT,
+            String.valueOf(purgeCount));
+
+        replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CN,
+            lastCN.toStringUI());
+        debugInfo("[PURGE] PurgeConflictsHistoricalTask write  attrs "
+            + purgeCount);
+      }
+    }
+    catch(DirectoryException de)
+    {
+      debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " +
+          de.getLocalizedMessage());
+      initState = TaskState.STOPPED_BY_ERROR;
+    }
+  }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
index 9bfc700..e00d04c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -492,4 +492,18 @@
   {
     return true;
   }
+  
+  /**
+   * Gets the "conflicts-historical-purge-delay" property.
+   * <p>
+   * This delay indicates the time (in minutes) the domain keeps the
+   * historical information necessary to solve conflicts.
+   *
+   * @return Returns the value of the "conflicts-historical-purge-delay" property.
+   **/
+  public long getConflictsHistoricalPurgeDelay()
+  {
+    return 1440;
+  }
+
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
index 7ccdf26..dd16601 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -34,28 +34,44 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.AddMsg;
 import org.opends.server.replication.protocol.ModifyMsg;
+import org.opends.server.schema.DirectoryStringSyntax;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.tools.LDAPModify;
 import org.opends.server.types.AbstractOperation;
 import org.opends.server.types.Attributes;
+import org.opends.server.types.ByteString;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Modification;
 import org.opends.server.types.ModificationType;
 import org.opends.server.types.AttributeType;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchResultEntry;
+import org.opends.server.types.SearchScope;
+import org.opends.server.types.operation.PluginOperation;
+import org.opends.server.backends.task.TaskBackend;
+import org.opends.server.backends.task.TaskState;
 import org.opends.server.core.DirectoryServer;
 import org.testng.annotations.Test;
 import org.testng.annotations.BeforeClass;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import static org.opends.messages.ReplicationMessages.ERR_NO_MATCHING_DOMAIN;
 import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME;
 
 import java.net.ServerSocket;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.UUID;
 
 /**
  * Tests the Historical class.
@@ -64,6 +80,7 @@
      extends ReplicationTestCase
 {
   private int replServerPort;
+  String testName = "historicalTest";
 
   /**
    * Set up replication on the test backend.
@@ -96,7 +113,6 @@
     replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
 
     // The suffix to be synchronized.
-    String testName = "historicalTest";
     String synchroServerStringDN = "cn=" + testName + ", cn=domains, " +
       SYNCHRO_PLUGIN_DN;
     String synchroServerLdif = "dn: " + synchroServerStringDN + "\n"
@@ -115,66 +131,68 @@
   /**
    * Tests that the attribute modification history is correctly read from
    * and written to an operational attribute of the entry.
+   * Also test that historical is purged according to the purge delay that
+   * is provided.
    * @throws Exception If the test fails.
    */
-  @Test
-  public void testEncoding()
+  @Test(enabled=true)
+  public void testEncodingAndPurge()
        throws Exception
   {
     //  Add a test entry.
     TestCaseUtils.addEntry(
-         "dn: uid=user.1," + TEST_ROOT_DN_STRING,
-         "objectClass: top",
-         "objectClass: person",
-         "objectClass: organizationalPerson",
-         "objectClass: inetOrgPerson",
-         "uid: user.1",
-         "cn: Aaccf Amar",
-         "sn: Amar",
-         "givenName: Aaccf",
-         "userPassword: password",
-         "description: Initial description",
-         "displayName: 1"
-       );
+        "dn: uid=user.1," + TEST_ROOT_DN_STRING,
+        "objectClass: top",
+        "objectClass: person",
+        "objectClass: organizationalPerson",
+        "objectClass: inetOrgPerson",
+        "uid: user.1",
+        "cn: Aaccf Amar",
+        "sn: Amar",
+        "givenName: Aaccf",
+        "userPassword: password",
+        "description: Initial description",
+        "displayName: 1"
+    );
 
     // Modify the test entry to give it some history.
     // Test both single and multi-valued attributes.
 
     String path = TestCaseUtils.createTempFile(
-         "dn: uid=user.1," + TEST_ROOT_DN_STRING,
-         "changetype: modify",
-         "add: cn;lang-en",
-         "cn;lang-en: Aaccf Amar",
-         "cn;lang-en: Aaccf A Amar",
-         "-",
-         "replace: description",
-         "description: replaced description",
-         "-",
-         "add: displayName",
-         "displayName: 2",
-         "-",
-         "delete: displayName",
-         "displayName: 1",
-         "-"
+        "dn: uid=user.1," + TEST_ROOT_DN_STRING,
+        "changetype: modify",
+        "add: cn;lang-en",
+        "cn;lang-en: Aaccf Amar",
+        "cn;lang-en: Aaccf A Amar",
+        "-",
+        "replace: description",
+        "description: replaced description",
+        "-",
+        "add: displayName",
+        "displayName: 2",
+        "-",
+        "delete: displayName",
+        "displayName: 1",
+        "-"
     );
 
     String[] args =
     {
-      "-h", "127.0.0.1",
-      "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
-      "-D", "cn=Directory Manager",
-      "-w", "password",
-      "-f", path
+        "-h", "127.0.0.1",
+        "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
+        "-D", "cn=Directory Manager",
+        "-w", "password",
+        "-f", path
     };
 
     assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
 
     args[9] = TestCaseUtils.createTempFile(
-         "dn: uid=user.1," + TEST_ROOT_DN_STRING,
-         "changetype: modify",
-         "replace: displayName",
-         "displayName: 2",
-         "-"
+        "dn: uid=user.1," + TEST_ROOT_DN_STRING,
+        "changetype: modify",
+        "replace: displayName",
+        "displayName: 2",
+        "-"
     );
 
     assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
@@ -188,9 +206,47 @@
 
     // Check that encoding and decoding preserves the history information.
     EntryHistorical hist = EntryHistorical.newInstanceFromEntry(entry);
-    Attribute after = hist.encode();
+    Attribute after = hist.encodeAndPurge();
 
+    assertEquals(hist.getLastPurgedValuesCount(),0);
     assertEquals(after, before);
+
+    LDAPReplicationDomain domain = MultimasterReplication.findDomain(
+        DN.decode("uid=user.1," + TEST_ROOT_DN_STRING), null);
+    Thread.sleep(1000);
+
+    args[9] = TestCaseUtils.createTempFile(
+        "dn: uid=user.1," + TEST_ROOT_DN_STRING,
+        "changetype: modify",
+        "replace: displayName",
+        "displayName: 3",
+        "-"
+    );
+    assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
+
+    long testPurgeDelayInMillisec = 1000; // 1 sec
+
+    // Read the entry back to get its history operational attribute.
+    entry = DirectoryServer.getEntry(dn);
+    hist = EntryHistorical.newInstanceFromEntry(entry);
+    hist.setPurgeDelay(testPurgeDelayInMillisec);
+    after = hist.encodeAndPurge();
+
+    // The purge time is not done so the hist attribute should be not empty
+    assertTrue(!after.isEmpty());
+
+    // Now wait for the purge time to be done
+    Thread.sleep(testPurgeDelayInMillisec);
+
+    // Read the entry back to get its history operational attribute.
+    // The hist attribute should now be empty since purged
+    entry = DirectoryServer.getEntry(dn);
+    hist = EntryHistorical.newInstanceFromEntry(entry);
+    hist.setPurgeDelay(testPurgeDelayInMillisec);
+    after = hist.encodeAndPurge();
+    assertTrue(after.isEmpty());
+    assertEquals(hist.getLastPurgedValuesCount(),11);
+
   }
 
   /**
@@ -263,11 +319,12 @@
     String entryuuid2 =
          attrs.get(0).iterator().next().getValue().toString();
 
+    long now = System.currentTimeMillis();
     // A change on a first server.
-    ChangeNumber t1 = new ChangeNumber(1,  0,  3);
+    ChangeNumber t1 = new ChangeNumber(now,  0,  3);
 
     // A change on a second server.
-    ChangeNumber t2 = new ChangeNumber(2,  0,  4);
+    ChangeNumber t2 = new ChangeNumber(now+1,  0,  4);
 
     // Simulate the ordering t1:add:A followed by t2:add:B that would
     // happen on one server.
@@ -294,8 +351,8 @@
     // Simulate the reverse ordering t2:add:B followed by t1:add:A that
     // would happen on the other server.
 
-    t1 = new ChangeNumber(3,  0,  3);
-    t2 = new ChangeNumber(4,  0,  4);
+    t1 = new ChangeNumber(now+3,  0,  3);
+    t2 = new ChangeNumber(now+4,  0,  4);
 
     // Replay an add of a value B at time t2 on a second server.
     attr = Attributes.create(attrType.getNormalizedPrimaryName(), "B");
@@ -326,7 +383,10 @@
     // The two values should be the first value added.
     assertEquals(attrValue1, "A");
     assertEquals(attrValue2, "A");
-  }
+
+    TestCaseUtils.deleteEntry(DN.decode("cn=test1," + TEST_ROOT_DN_STRING));
+    TestCaseUtils.deleteEntry(DN.decode("cn=test2," + TEST_ROOT_DN_STRING));
+}
 
   private static
   void publishModify(ReplicationBroker broker, ChangeNumber changeNum,
@@ -431,7 +491,9 @@
   }
 
   /**
-   *
+   * Performs a few check on the provided ADD operations, particularly
+   * that a ADDmsg can be created from it with valid values for fields
+   * DN, entryuid, ...)
    */
   private void assertFakeOperations(final DN dn1, Entry entry,
       Iterable<FakeOperation> ops, int assertCount) throws Exception
@@ -467,4 +529,175 @@
 
       assertEquals(count, assertCount);
     }
+  
+  /**
+   * Test the task that purges the replication historical stored in the user 
+   * entry.
+   * Steps :
+   * - creates entry containing historical
+   * - wait for the pruge delay
+   * - lauch the purge task
+   * - verify that all historical has been purged
+   * 
+   * TODO: another test should be written that configures the task no NOT have
+   * the time to purge everything in 1 run .. and thus to relauch it to finish
+   * the purge. And verify that the second run starts on the changeNumber where
+   * the previous task run had stopped.
+   *
+   * @throws Exception If the test fails.
+   */
+  @Test(enabled=true)
+  public void testRecurringPurgeIn1Run()
+  throws Exception
+  {
+    int entryCnt = 10;
+
+    addEntriesWithHistorical(1, entryCnt);
+
+    /*
+    // every entry should have its hist
+    try
+    {
+      // Search for matching entries in config backend
+      InternalSearchOperation op = connection.processSearch(
+          ByteString.valueOf(TEST_ROOT_DN_STRING),
+          SearchScope.WHOLE_SUBTREE,
+          LDAPFilter.decode("(ds-sync-hist=*)"));
+      assertEquals(op.getResultCode(), ResultCode.SUCCESS,
+          op.getErrorMessage().toString());
+
+      // Check that no entries have been found
+      LinkedList<SearchResultEntry> entries = op.getSearchEntries();
+      assertTrue(entries != null);
+      assertEquals(entries.size(), entryCnt);
+    } catch (Exception e)
+    {
+      fail("assertNoConfigEntriesWithFilter: could not search config backend" + e.getMessage());
+    }
+    */
+
+    // set the purge delay to 1 sec
+    TestCaseUtils.dsconfig(
+        "set-replication-domain-prop",
+        "--provider-name","Multimaster Synchronization",
+        "--domain-name",testName,
+        "--set","conflicts-historical-purge-delay:1m");
+
+    Thread.sleep(60*1000);
+
+    // launch the purge
+    Entry taskInit = TestCaseUtils.makeEntry(
+        "dn: ds-task-id=" + UUID.randomUUID() +
+        ",cn=Scheduled Tasks,cn=Tasks",
+        "objectclass: top",
+        "objectclass: ds-task",
+        "objectclass: ds-task-purge-conflicts-historical",
+        "ds-task-class-name: org.opends.server.tasks.PurgeConflictsHistoricalTask",
+        "ds-task-purge-conflicts-historical-domain-dn: "+TEST_ROOT_DN_STRING,
+    "ds-task-purge-conflicts-historical-maximum-duration: 1000"); // 1000 sec
+
+    addTask(taskInit, ResultCode.SUCCESS, null);
+
+    // every entry should be purged from its hist
+    try
+    {
+      // Search for matching entries in config backend
+      InternalSearchOperation op = connection.processSearch(
+          ByteString.valueOf(TEST_ROOT_DN_STRING),
+          SearchScope.WHOLE_SUBTREE,
+          LDAPFilter.decode("(ds-sync-hist=*)"));
+      assertEquals(op.getResultCode(), ResultCode.SUCCESS,
+          op.getErrorMessage().toString());
+
+      // Check that no entries have been found
+      LinkedList<SearchResultEntry> entries = op.getSearchEntries();
+      assertTrue(entries != null);
+      assertEquals(entries.size(), 0);
+    } catch (Exception e)
+    {
+      fail("assertNoConfigEntriesWithFilter: could not search config backend" + e.getMessage());
+    }
+  }
+
+  /**
+   * Add a provided number of generated entries containing historical.
+   * @param dnSuffix A suffix to be added to the dn
+   * @param entryCnt The number of entries to create
+   * @throws Exception
+   */
+  private void addEntriesWithHistorical(int dnSuffix, int entryCnt)
+  throws Exception
+  {
+    for (int i=0; i<entryCnt;i++)
+    {
+      String sdn =  "dn: uid=user"+i+dnSuffix+"," + TEST_ROOT_DN_STRING;
+
+        //  Add a test entry.
+        TestCaseUtils.addEntry(
+            sdn,
+            "objectClass: top",
+            "objectClass: person",
+            "objectClass: organizationalPerson",
+            "objectClass: inetOrgPerson",
+            "uid: user"+i,
+            "cn: Aaccf Amar",
+            "sn: Amar",
+            "givenName: Aaccf",
+            "userPassword: password",
+            "description: Initial description",
+            "displayName: 1"
+        );
+
+      // Modify the test entry to give it some history.
+      // Test both single and multi-valued attributes.
+
+      String path = TestCaseUtils.createTempFile(
+          sdn,
+          "changetype: modify",
+          "add: cn;lang-en",
+          "cn;lang-en: Aaccf Amar",
+          "cn;lang-en: Aaccf A Amar",
+          "-",
+          "replace: givenName",
+          "givenName: new given",
+          "-",
+          "replace: userPassword",
+          "userPassword: new pass",
+          "-",
+          "replace: description",
+          "description: replaced description",
+          "-",
+          "replace: sn",
+          "sn: replaced sn",
+          "-",
+          "add: displayName",
+          "displayName: 2",
+          "-",
+          "delete: displayName",
+          "displayName: 1",
+          "-"
+      );
+
+      String[] args =
+      {
+          "-h", "127.0.0.1",
+          "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
+          "-D", "cn=Directory Manager",
+          "-w", "password",
+          "-f", path
+      };
+
+      assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
+
+      args[9] = TestCaseUtils.createTempFile(
+          sdn,
+          "changetype: modify",
+          "replace: displayName",
+          "displayName: 2",
+          "-"
+      );
+
+      assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
+    }
+  }  
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java
index 5c478f2..b524506 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/ModifyConflictTest.java
@@ -1329,9 +1329,9 @@
      * as the initial value.
      */
     entry.removeAttribute(historicalAttrType);
-    entry.addAttribute(hist.encode(), null);
+    entry.addAttribute(hist.encodeAndPurge(), null);
     EntryHistorical hist2 = EntryHistorical.newInstanceFromEntry(entry);
-    assertEquals(hist2.encode().toString(), hist.encode().toString());
+    assertEquals(hist2.encodeAndPurge().toString(), hist.encodeAndPurge().toString());
 
     return mods;
   }

--
Gitblit v1.10.0