From b9aad30c9e07b179a2c22fad830f6a54b8993bc9 Mon Sep 17 00:00:00 2001
From: ludovicp <ludovicp@localhost>
Date: Wed, 18 Aug 2010 13:27:32 +0000
Subject: [PATCH] First pass on solving issue #514, reducing Replication meta data, especially aged information. The default is to keep the replication meta data (historical information) for at least 1 day. Purging occurs on the fly when entries are modified, or via a task. Launching the task will be available through dsreplication in separate commit. 

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java |  329 ++++++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 281 insertions(+), 48 deletions(-)

diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
index 7ccdf26..dd16601 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -34,28 +34,44 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.AddMsg;
 import org.opends.server.replication.protocol.ModifyMsg;
+import org.opends.server.schema.DirectoryStringSyntax;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.tools.LDAPModify;
 import org.opends.server.types.AbstractOperation;
 import org.opends.server.types.Attributes;
+import org.opends.server.types.ByteString;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Modification;
 import org.opends.server.types.ModificationType;
 import org.opends.server.types.AttributeType;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchResultEntry;
+import org.opends.server.types.SearchScope;
+import org.opends.server.types.operation.PluginOperation;
+import org.opends.server.backends.task.TaskBackend;
+import org.opends.server.backends.task.TaskState;
 import org.opends.server.core.DirectoryServer;
 import org.testng.annotations.Test;
 import org.testng.annotations.BeforeClass;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import static org.opends.messages.ReplicationMessages.ERR_NO_MATCHING_DOMAIN;
 import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME;
 
 import java.net.ServerSocket;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.UUID;
 
 /**
  * Tests the Historical class.
@@ -64,6 +80,7 @@
      extends ReplicationTestCase
 {
   private int replServerPort;
+  String testName = "historicalTest";
 
   /**
    * Set up replication on the test backend.
@@ -96,7 +113,6 @@
     replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
 
     // The suffix to be synchronized.
-    String testName = "historicalTest";
     String synchroServerStringDN = "cn=" + testName + ", cn=domains, " +
       SYNCHRO_PLUGIN_DN;
     String synchroServerLdif = "dn: " + synchroServerStringDN + "\n"
@@ -115,66 +131,68 @@
   /**
    * Tests that the attribute modification history is correctly read from
    * and written to an operational attribute of the entry.
+   * Also test that historical is purged according to the purge delay that
+   * is provided.
    * @throws Exception If the test fails.
    */
-  @Test
-  public void testEncoding()
+  @Test(enabled=true)
+  public void testEncodingAndPurge()
        throws Exception
   {
     //  Add a test entry.
     TestCaseUtils.addEntry(
-         "dn: uid=user.1," + TEST_ROOT_DN_STRING,
-         "objectClass: top",
-         "objectClass: person",
-         "objectClass: organizationalPerson",
-         "objectClass: inetOrgPerson",
-         "uid: user.1",
-         "cn: Aaccf Amar",
-         "sn: Amar",
-         "givenName: Aaccf",
-         "userPassword: password",
-         "description: Initial description",
-         "displayName: 1"
-       );
+        "dn: uid=user.1," + TEST_ROOT_DN_STRING,
+        "objectClass: top",
+        "objectClass: person",
+        "objectClass: organizationalPerson",
+        "objectClass: inetOrgPerson",
+        "uid: user.1",
+        "cn: Aaccf Amar",
+        "sn: Amar",
+        "givenName: Aaccf",
+        "userPassword: password",
+        "description: Initial description",
+        "displayName: 1"
+    );
 
     // Modify the test entry to give it some history.
     // Test both single and multi-valued attributes.
 
     String path = TestCaseUtils.createTempFile(
-         "dn: uid=user.1," + TEST_ROOT_DN_STRING,
-         "changetype: modify",
-         "add: cn;lang-en",
-         "cn;lang-en: Aaccf Amar",
-         "cn;lang-en: Aaccf A Amar",
-         "-",
-         "replace: description",
-         "description: replaced description",
-         "-",
-         "add: displayName",
-         "displayName: 2",
-         "-",
-         "delete: displayName",
-         "displayName: 1",
-         "-"
+        "dn: uid=user.1," + TEST_ROOT_DN_STRING,
+        "changetype: modify",
+        "add: cn;lang-en",
+        "cn;lang-en: Aaccf Amar",
+        "cn;lang-en: Aaccf A Amar",
+        "-",
+        "replace: description",
+        "description: replaced description",
+        "-",
+        "add: displayName",
+        "displayName: 2",
+        "-",
+        "delete: displayName",
+        "displayName: 1",
+        "-"
     );
 
     String[] args =
     {
-      "-h", "127.0.0.1",
-      "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
-      "-D", "cn=Directory Manager",
-      "-w", "password",
-      "-f", path
+        "-h", "127.0.0.1",
+        "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
+        "-D", "cn=Directory Manager",
+        "-w", "password",
+        "-f", path
     };
 
     assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
 
     args[9] = TestCaseUtils.createTempFile(
-         "dn: uid=user.1," + TEST_ROOT_DN_STRING,
-         "changetype: modify",
-         "replace: displayName",
-         "displayName: 2",
-         "-"
+        "dn: uid=user.1," + TEST_ROOT_DN_STRING,
+        "changetype: modify",
+        "replace: displayName",
+        "displayName: 2",
+        "-"
     );
 
     assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
@@ -188,9 +206,47 @@
 
     // Check that encoding and decoding preserves the history information.
     EntryHistorical hist = EntryHistorical.newInstanceFromEntry(entry);
-    Attribute after = hist.encode();
+    Attribute after = hist.encodeAndPurge();
 
+    assertEquals(hist.getLastPurgedValuesCount(),0);
     assertEquals(after, before);
+
+    LDAPReplicationDomain domain = MultimasterReplication.findDomain(
+        DN.decode("uid=user.1," + TEST_ROOT_DN_STRING), null);
+    Thread.sleep(1000);
+
+    args[9] = TestCaseUtils.createTempFile(
+        "dn: uid=user.1," + TEST_ROOT_DN_STRING,
+        "changetype: modify",
+        "replace: displayName",
+        "displayName: 3",
+        "-"
+    );
+    assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
+
+    long testPurgeDelayInMillisec = 1000; // 1 sec
+
+    // Read the entry back to get its history operational attribute.
+    entry = DirectoryServer.getEntry(dn);
+    hist = EntryHistorical.newInstanceFromEntry(entry);
+    hist.setPurgeDelay(testPurgeDelayInMillisec);
+    after = hist.encodeAndPurge();
+
+    // The purge time is not done so the hist attribute should be not empty
+    assertTrue(!after.isEmpty());
+
+    // Now wait for the purge time to be done
+    Thread.sleep(testPurgeDelayInMillisec);
+
+    // Read the entry back to get its history operational attribute.
+    // The hist attribute should now be empty since purged
+    entry = DirectoryServer.getEntry(dn);
+    hist = EntryHistorical.newInstanceFromEntry(entry);
+    hist.setPurgeDelay(testPurgeDelayInMillisec);
+    after = hist.encodeAndPurge();
+    assertTrue(after.isEmpty());
+    assertEquals(hist.getLastPurgedValuesCount(),11);
+
   }
 
   /**
@@ -263,11 +319,12 @@
     String entryuuid2 =
          attrs.get(0).iterator().next().getValue().toString();
 
+    long now = System.currentTimeMillis();
     // A change on a first server.
-    ChangeNumber t1 = new ChangeNumber(1,  0,  3);
+    ChangeNumber t1 = new ChangeNumber(now,  0,  3);
 
     // A change on a second server.
-    ChangeNumber t2 = new ChangeNumber(2,  0,  4);
+    ChangeNumber t2 = new ChangeNumber(now+1,  0,  4);
 
     // Simulate the ordering t1:add:A followed by t2:add:B that would
     // happen on one server.
@@ -294,8 +351,8 @@
     // Simulate the reverse ordering t2:add:B followed by t1:add:A that
     // would happen on the other server.
 
-    t1 = new ChangeNumber(3,  0,  3);
-    t2 = new ChangeNumber(4,  0,  4);
+    t1 = new ChangeNumber(now+3,  0,  3);
+    t2 = new ChangeNumber(now+4,  0,  4);
 
     // Replay an add of a value B at time t2 on a second server.
     attr = Attributes.create(attrType.getNormalizedPrimaryName(), "B");
@@ -326,7 +383,10 @@
     // The two values should be the first value added.
     assertEquals(attrValue1, "A");
     assertEquals(attrValue2, "A");
-  }
+
+    TestCaseUtils.deleteEntry(DN.decode("cn=test1," + TEST_ROOT_DN_STRING));
+    TestCaseUtils.deleteEntry(DN.decode("cn=test2," + TEST_ROOT_DN_STRING));
+}
 
   private static
   void publishModify(ReplicationBroker broker, ChangeNumber changeNum,
@@ -431,7 +491,9 @@
   }
 
   /**
-   *
+   * Performs a few check on the provided ADD operations, particularly
+   * that a ADDmsg can be created from it with valid values for fields
+   * DN, entryuid, ...)
    */
   private void assertFakeOperations(final DN dn1, Entry entry,
       Iterable<FakeOperation> ops, int assertCount) throws Exception
@@ -467,4 +529,175 @@
 
       assertEquals(count, assertCount);
     }
+  
+  /**
+   * Test the task that purges the replication historical stored in the user 
+   * entry.
+   * Steps :
+   * - creates entry containing historical
+   * - wait for the pruge delay
+   * - lauch the purge task
+   * - verify that all historical has been purged
+   * 
+   * TODO: another test should be written that configures the task no NOT have
+   * the time to purge everything in 1 run .. and thus to relauch it to finish
+   * the purge. And verify that the second run starts on the changeNumber where
+   * the previous task run had stopped.
+   *
+   * @throws Exception If the test fails.
+   */
+  @Test(enabled=true)
+  public void testRecurringPurgeIn1Run()
+  throws Exception
+  {
+    int entryCnt = 10;
+
+    addEntriesWithHistorical(1, entryCnt);
+
+    /*
+    // every entry should have its hist
+    try
+    {
+      // Search for matching entries in config backend
+      InternalSearchOperation op = connection.processSearch(
+          ByteString.valueOf(TEST_ROOT_DN_STRING),
+          SearchScope.WHOLE_SUBTREE,
+          LDAPFilter.decode("(ds-sync-hist=*)"));
+      assertEquals(op.getResultCode(), ResultCode.SUCCESS,
+          op.getErrorMessage().toString());
+
+      // Check that no entries have been found
+      LinkedList<SearchResultEntry> entries = op.getSearchEntries();
+      assertTrue(entries != null);
+      assertEquals(entries.size(), entryCnt);
+    } catch (Exception e)
+    {
+      fail("assertNoConfigEntriesWithFilter: could not search config backend" + e.getMessage());
+    }
+    */
+
+    // set the purge delay to 1 sec
+    TestCaseUtils.dsconfig(
+        "set-replication-domain-prop",
+        "--provider-name","Multimaster Synchronization",
+        "--domain-name",testName,
+        "--set","conflicts-historical-purge-delay:1m");
+
+    Thread.sleep(60*1000);
+
+    // launch the purge
+    Entry taskInit = TestCaseUtils.makeEntry(
+        "dn: ds-task-id=" + UUID.randomUUID() +
+        ",cn=Scheduled Tasks,cn=Tasks",
+        "objectclass: top",
+        "objectclass: ds-task",
+        "objectclass: ds-task-purge-conflicts-historical",
+        "ds-task-class-name: org.opends.server.tasks.PurgeConflictsHistoricalTask",
+        "ds-task-purge-conflicts-historical-domain-dn: "+TEST_ROOT_DN_STRING,
+    "ds-task-purge-conflicts-historical-maximum-duration: 1000"); // 1000 sec
+
+    addTask(taskInit, ResultCode.SUCCESS, null);
+
+    // every entry should be purged from its hist
+    try
+    {
+      // Search for matching entries in config backend
+      InternalSearchOperation op = connection.processSearch(
+          ByteString.valueOf(TEST_ROOT_DN_STRING),
+          SearchScope.WHOLE_SUBTREE,
+          LDAPFilter.decode("(ds-sync-hist=*)"));
+      assertEquals(op.getResultCode(), ResultCode.SUCCESS,
+          op.getErrorMessage().toString());
+
+      // Check that no entries have been found
+      LinkedList<SearchResultEntry> entries = op.getSearchEntries();
+      assertTrue(entries != null);
+      assertEquals(entries.size(), 0);
+    } catch (Exception e)
+    {
+      fail("assertNoConfigEntriesWithFilter: could not search config backend" + e.getMessage());
+    }
+  }
+
+  /**
+   * Add a provided number of generated entries containing historical.
+   * @param dnSuffix A suffix to be added to the dn
+   * @param entryCnt The number of entries to create
+   * @throws Exception
+   */
+  private void addEntriesWithHistorical(int dnSuffix, int entryCnt)
+  throws Exception
+  {
+    for (int i=0; i<entryCnt;i++)
+    {
+      String sdn =  "dn: uid=user"+i+dnSuffix+"," + TEST_ROOT_DN_STRING;
+
+        //  Add a test entry.
+        TestCaseUtils.addEntry(
+            sdn,
+            "objectClass: top",
+            "objectClass: person",
+            "objectClass: organizationalPerson",
+            "objectClass: inetOrgPerson",
+            "uid: user"+i,
+            "cn: Aaccf Amar",
+            "sn: Amar",
+            "givenName: Aaccf",
+            "userPassword: password",
+            "description: Initial description",
+            "displayName: 1"
+        );
+
+      // Modify the test entry to give it some history.
+      // Test both single and multi-valued attributes.
+
+      String path = TestCaseUtils.createTempFile(
+          sdn,
+          "changetype: modify",
+          "add: cn;lang-en",
+          "cn;lang-en: Aaccf Amar",
+          "cn;lang-en: Aaccf A Amar",
+          "-",
+          "replace: givenName",
+          "givenName: new given",
+          "-",
+          "replace: userPassword",
+          "userPassword: new pass",
+          "-",
+          "replace: description",
+          "description: replaced description",
+          "-",
+          "replace: sn",
+          "sn: replaced sn",
+          "-",
+          "add: displayName",
+          "displayName: 2",
+          "-",
+          "delete: displayName",
+          "displayName: 1",
+          "-"
+      );
+
+      String[] args =
+      {
+          "-h", "127.0.0.1",
+          "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
+          "-D", "cn=Directory Manager",
+          "-w", "password",
+          "-f", path
+      };
+
+      assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
+
+      args[9] = TestCaseUtils.createTempFile(
+          sdn,
+          "changetype: modify",
+          "replace: displayName",
+          "displayName: 2",
+          "-"
+      );
+
+      assertEquals(LDAPModify.mainModify(args, false, null, System.err), 0);
+    }
+  }  
 }

--
Gitblit v1.10.0