From 860b0be3a8040fffbc0e6fa89b066efb8f4234fb Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 28 Feb 2007 13:36:23 +0000
Subject: [PATCH] Fix for issue 517 : Configurable purge delay

---
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java                             |   65 +++++++++-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java |  129 +++++++++++++++++++++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java                             |   21 +++
 opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java                              |   82 +++++++++++++
 opendj-sdk/opends/resource/schema/02-config.ldif                                                                    |    4 
 5 files changed, 292 insertions(+), 9 deletions(-)

diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index a7d8860..403c034 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -1080,6 +1080,10 @@
   NAME 'ds-cfg-certificate-fingerprint-algorithm'
   SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
   X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.319
+  NAME 'ds-cfg-changelog-purge-delay'
+  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+  X-ORIGIN 'OpenDS Directory Server' ) 
 objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
   NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
   MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 1b61fec..300bda3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -29,6 +29,7 @@
 import static org.opends.server.loggers.Error.logError;
 import static org.opends.server.messages.MessageHandler.getMessage;
 import static org.opends.server.synchronization.common.LogMessages.*;
+import static org.opends.server.util.ServerConstants.*;
 import static org.opends.server.util.StaticUtils.getFileForPath;
 
 import java.io.File;
@@ -40,6 +41,7 @@
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
 
@@ -49,6 +51,7 @@
 import org.opends.server.config.ConfigEntry;
 import org.opends.server.config.ConfigException;
 import org.opends.server.config.IntegerConfigAttribute;
+import org.opends.server.config.IntegerWithUnitConfigAttribute;
 import org.opends.server.config.StringConfigAttribute;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.messages.MessageHandler;
@@ -100,6 +103,8 @@
   private int rcvWindow;
   private int queueSize;
   private String dbDirname = null;
+  private long trimAge; // the time (in sec) after which the  changes must
+                        // de deleted from the persistent storage.
 
   static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
   static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
@@ -107,11 +112,12 @@
   static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
   static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
   static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
+  static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
+
 
   static final IntegerConfigAttribute changelogPortStub =
     new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
-      true, false, false, true, 0,
-      true, 65535);
+      true, false, false, true, 0, true, 65535);
 
   static final IntegerConfigAttribute serverIdStub =
     new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false,
@@ -119,8 +125,7 @@
 
   static final StringConfigAttribute changelogStub =
     new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
-        "changelog server information", true,
-        true, false);
+        "changelog server information", true, true, false);
 
   static final IntegerConfigAttribute windowStub =
     new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
@@ -132,8 +137,30 @@
 
   static final StringConfigAttribute dbDirnameStub =
     new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR,
-        "changelog storage directory path", false,
-        false, true);
+        "changelog storage directory path", false, false, true);
+
+  /**
+   * The set of time units that will be used for expressing the
+   * changelog purge delay.
+   */
+  private static final LinkedHashMap<String,Double> purgeTimeUnits =
+       new LinkedHashMap<String,Double>();
+
+  static
+  {
+    purgeTimeUnits.put(TIME_UNIT_SECONDS_ABBR, 1D);
+    purgeTimeUnits.put(TIME_UNIT_SECONDS_FULL, 1D);
+    purgeTimeUnits.put(TIME_UNIT_MINUTES_ABBR, 60D);
+    purgeTimeUnits.put(TIME_UNIT_MINUTES_FULL, 1D);
+    purgeTimeUnits.put(TIME_UNIT_HOURS_ABBR, 60*60D);
+    purgeTimeUnits.put(TIME_UNIT_HOURS_FULL, 60*60D);
+    purgeTimeUnits.put(TIME_UNIT_DAYS_ABBR, 24*60*60D);
+    purgeTimeUnits.put(TIME_UNIT_DAYS_FULL, 24*60*60D);
+  }
+
+  static final IntegerWithUnitConfigAttribute purgeDelayStub =
+    new IntegerWithUnitConfigAttribute(PURGE_DELAY_ATTR,
+        "changelog purge delay", false, purgeTimeUnits, true, 0, false, 0);
 
   /**
    * Check if a ConfigEntry is valid.
@@ -299,6 +326,20 @@
           e.getMessage() + " " + getFileForPath(dbDirname));
     }
 
+    /*
+     * Read the Purge Delay (trim age) attribute
+     */
+    IntegerWithUnitConfigAttribute purgeDelayAttr =
+      (IntegerWithUnitConfigAttribute) config.getConfigAttribute(
+          purgeDelayStub);
+    if (purgeDelayAttr == null)
+      trimAge = 24*60*60;  // not present : use the default value : 1 day
+    else
+    {
+      trimAge = purgeDelayAttr.activeCalculatedValue();
+      configAttributes.add(purgeDelayAttr);
+    }
+
     initialize(changelogServerId, changelogPort);
 
     configDn = config.getDN();
@@ -606,4 +647,16 @@
   {
     return new DbHandler(id, baseDn, this, dbEnv);
   }
+
+  /**
+   * Retrieves the time after which changes must be deleted from the
+   * persistent storage (in milliseconds).
+   *
+   * @return  The time after which changes must be deleted from the
+   *          persistent storage (in milliseconds).
+   */
+  public long getTrimage()
+  {
+    return trimAge * 1000;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
index 90132ac..ebd5d63 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -83,13 +83,17 @@
   private boolean done = false;
   private DirectoryThread thread = null;
   private Object flushLock = new Object();
+  /**
+   * the trim age in milliseconds.
+   */
+  private long trimage;
 
   /**
    * Creates a New dbHandler associated to a given LDAP server.
    *
    * @param id Identifier of the DB.
-   * @param baseDn of the DB.
-   * @param changelog the Changelog that creates this dbHandler.
+   * @param baseDn the baseDn for which this DB was created.
+   * @param changelog The Changelog that creates this dbHandler.
    * @param dbenv the Database Env to use to create the Changelog DB.
    * @throws DatabaseException If a database problem happened
    */
@@ -99,6 +103,7 @@
   {
     this.serverId = id;
     this.baseDn = baseDn;
+    this.trimage = changelog.getTrimage();
     db = new ChangelogDB(id, baseDn, changelog, dbenv);
     firstChange = db.readFirstChange();
     lastChange = db.readLastChange();
@@ -337,9 +342,10 @@
    */
   private void trim() throws DatabaseException, Exception
   {
+    if (trimage == 0)
+      return;
     int size = 0;
     boolean finished = false;
-    int trimage = 24*60*60*1000; // TODO : make trim-age a config parameter
     ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
         (short) 0, (short)0);
 
@@ -493,4 +499,13 @@
     return(baseDn + " " + serverId + " " + firstChange + " " + lastChange);
   }
 
+  /**
+   * Set the Purge delay for this db Handler.
+   * @param delay The purge delay in Milliseconds.
+   */
+  public void setPurgeDelay(long delay)
+  {
+    trimage = delay;
+  }
+
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
index b592b88..3998104 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -274,6 +274,58 @@
     CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 36;
 
   /**
+   * The message ID for the description of the attribute used to specify the
+   * list of other Changelog Servers in the Changelog Server
+   * Configuration.
+   */
+  public static final int MSGID_CHANGELOG_SERVER_ATTR =
+    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 37;
+
+  /**
+   * The message ID for the description of the attribute used to specify
+   * the identifier of the Changelog Server.
+   */
+  public static final int MSGID_SERVER_ID_ATTR =
+    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 38;
+
+  /**
+   * The message id for the description of the attribute used to specify
+   * the port number of the Changelog Server.
+   */
+  public static final int MSGID_CHANGELOG_PORT_ATTR =
+    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 39;
+
+  /**
+   * The message id for the description of the attribute used to specify
+   * the receive Window Size used by a Changelog Server.
+   */
+  public static final int MSGID_WINDOW_SIZE_ATTR =
+    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 40;
+
+  /**
+   * The message id for thedescription of the  attribute used to specify
+   * the maximum queue size used by a Changelog Server.
+   */
+  public static final int MSGID_QUEUE_SIZE_ATTR =
+    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 41;
+
+  /**
+   * The message id for the Attribute used to specify the directory where the
+   * persistent storage of the Changelog server will be saved.
+   */
+  public static final int MSGID_CHANGELOG_DIR_PATH_ATTR =
+    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42;
+
+  /**
+   * The message id for thedescription of the attribute used to configure
+   * the purge delay of the Changelog Servers.
+   */
+  public static final int MSGID_PURGE_DELAY_ATTR =
+    CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43;
+
+
+
+  /**
    * Register the messages from this class in the core server.
    *
    */
@@ -367,5 +419,35 @@
     MessageHandler.registerMessage(MSGID_FILE_CHECK_CREATE_FAILED,
         "An Exception was caught while testing existence or trying " +
         " to create the directory for the changelog database : %s");
+    MessageHandler.registerMessage(MSGID_CHANGELOG_SERVER_ATTR,
+        "Specifies the list of Changelog Servers to which this" +
+        " Changelog Server should connect. Each value of this attribute" +
+        " should contain a values build with the hostname and the port" +
+        " number of the remote server separated with a \":\"");
+    MessageHandler.registerMessage(MSGID_SERVER_ID_ATTR,
+        "Specifies the server ID. Each Changelog Server in the topology" +
+        " Must be assigned a unique server ID in the topology.");
+    MessageHandler.registerMessage(MSGID_CHANGELOG_PORT_ATTR,
+        "Specifies the port number that the changelog server will use to" +
+        " listen for connections from LDAP servers.");
+    MessageHandler.registerMessage(MSGID_WINDOW_SIZE_ATTR,
+        "Specifies the receive window size of the changelog server.");
+    MessageHandler.registerMessage(MSGID_QUEUE_SIZE_ATTR,
+        "Specifies the receive queue size of the changelog server." +
+        " The Changelog servers will queue up to this number of messages" +
+        " in its memory queue and save the older messages to persistent" +
+        " storage. Using a larger size may improve performances when" +
+        " The synchronization delay is larger than this size but at the cost" +
+        " of using more memory.");
+    MessageHandler.registerMessage(MSGID_CHANGELOG_DIR_PATH_ATTR,
+        "Specifies the Changelog Server directory. The Changelog server" +
+        " will create all persistent storage below this path.");
+    MessageHandler.registerMessage(MSGID_PURGE_DELAY_ATTR,
+        "Specifies the Changelog Purge Delay, The Changelog servers will" +
+        " keep all changes up to this amount of time before deleting them." +
+        " This values defines the maximum age of a backup that can be" +
+        " restored because changelog servers would not be able to refresh" +
+        " LDAP servers with older versions of the data. A zero value" +
+        " can be used to specify an infinite delay (or never purge).");
   }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java
new file mode 100644
index 0000000..5590012
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java
@@ -0,0 +1,129 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.changelog;
+
+import java.io.File;
+import java.net.ServerSocket;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.config.ConfigEntry;
+import org.opends.server.synchronization.SynchronizationTestCase;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ChangeNumberGenerator;
+import org.opends.server.synchronization.protocol.DeleteMsg;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
+/**
+ * Test the dbHandler class
+ */
+public class dbHandlerTest extends SynchronizationTestCase
+{
+  @Test()
+  void testDbHandlerTrim() throws Exception
+  {
+    TestCaseUtils.startServer();
+
+    //  find  a free port for the changelog server
+    ServerSocket socket = TestCaseUtils.bindFreePort();
+    int changelogPort = socket.getLocalPort();
+    socket.close();
+
+    // configure a Changelog server.
+    String changelogLdif =
+      "dn: cn=Changelog Server\n"
+        + "objectClass: top\n"
+        + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+        + "cn: Changelog Server\n"
+        + "ds-cfg-changelog-port: "+ changelogPort + "\n"
+        + "ds-cfg-changelog-server-id: 2\n"
+        + "ds-cfg-changelog-purge-delay: 0 d";
+    Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
+    ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
+    Changelog changelog = new Changelog(changelogConfig);
+
+    // create or clean a directory for the dbHandler
+    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+    String path = buildRoot + File.separator + "dbHandler";
+    File testRoot = new File(path);
+    if (testRoot.exists())
+    {
+      TestCaseUtils.deleteDirectory(testRoot);
+    }
+    testRoot.mkdirs();
+
+    ChangelogDbEnv dbEnv = new ChangelogDbEnv(path, changelog);
+
+    DbHandler handler =
+      new DbHandler((short) 1, DN.decode("o=test"), changelog, dbEnv);
+    
+    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
+    ChangeNumber changeNumber1 = gen.NewChangeNumber();
+    ChangeNumber changeNumber2 = gen.NewChangeNumber();
+    ChangeNumber changeNumber3 = gen.NewChangeNumber();
+    
+    DeleteMsg update1 = new DeleteMsg("o=test", changeNumber1, "uid"); 
+    DeleteMsg update2 = new DeleteMsg("o=test", changeNumber2, "uid"); 
+    DeleteMsg update3 = new DeleteMsg("o=test", changeNumber3, "uid"); 
+    
+    handler.add(update1);
+    handler.add(update2);
+    handler.add(update3);
+    
+    // The ChangeNumber should not get purged
+    assertEquals(changeNumber1, handler.getFirstChange());
+    assertEquals(changeNumber3, handler.getLastChange());
+    
+    handler.setPurgeDelay(1);
+    
+    boolean purged = false;
+    int count = 300;  // wait at most 60 seconds
+    while (!purged && (count>0))
+    {
+      ChangeNumber firstChange = handler.getFirstChange();
+      ChangeNumber lastChange = handler.getLastChange();
+      if ((!firstChange.equals(changeNumber3) ||
+          (!lastChange.equals(changeNumber3))))
+      {
+        TestCaseUtils.sleep(100);
+      }
+      else
+      {
+        purged = true;
+      }
+    }
+    
+    handler.shutdown();
+    dbEnv.shutdown();
+    changelog.shutdown();
+    
+    TestCaseUtils.deleteDirectory(testRoot);
+  }
+
+}

--
Gitblit v1.10.0