From 860b0be3a8040fffbc0e6fa89b066efb8f4234fb Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 28 Feb 2007 13:36:23 +0000
Subject: [PATCH] Fix for issue 517 : Configurable purge delay
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java | 65 +++++++++-
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java | 129 +++++++++++++++++++++
opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java | 21 +++
opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java | 82 +++++++++++++
opendj-sdk/opends/resource/schema/02-config.ldif | 4
5 files changed, 292 insertions(+), 9 deletions(-)
diff --git a/opendj-sdk/opends/resource/schema/02-config.ldif b/opendj-sdk/opends/resource/schema/02-config.ldif
index a7d8860..403c034 100644
--- a/opendj-sdk/opends/resource/schema/02-config.ldif
+++ b/opendj-sdk/opends/resource/schema/02-config.ldif
@@ -1080,6 +1080,10 @@
NAME 'ds-cfg-certificate-fingerprint-algorithm'
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE
X-ORIGIN 'OpenDS Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.26027.1.1.319
+ NAME 'ds-cfg-changelog-purge-delay'
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE
+ X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL
MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled )
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
index 1b61fec..300bda3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -29,6 +29,7 @@
import static org.opends.server.loggers.Error.logError;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.synchronization.common.LogMessages.*;
+import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.getFileForPath;
import java.io.File;
@@ -40,6 +41,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
@@ -49,6 +51,7 @@
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.IntegerConfigAttribute;
+import org.opends.server.config.IntegerWithUnitConfigAttribute;
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.DirectoryServer;
import org.opends.server.messages.MessageHandler;
@@ -100,6 +103,8 @@
private int rcvWindow;
private int queueSize;
private String dbDirname = null;
+ private long trimAge; // the time (in sec) after which the changes must
+ // de deleted from the persistent storage.
static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id";
@@ -107,11 +112,12 @@
static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size";
static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size";
static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname";
+ static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay";
+
static final IntegerConfigAttribute changelogPortStub =
new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port",
- true, false, false, true, 0,
- true, 65535);
+ true, false, false, true, 0, true, 65535);
static final IntegerConfigAttribute serverIdStub =
new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false,
@@ -119,8 +125,7 @@
static final StringConfigAttribute changelogStub =
new StringConfigAttribute(CHANGELOG_SERVER_ATTR,
- "changelog server information", true,
- true, false);
+ "changelog server information", true, true, false);
static final IntegerConfigAttribute windowStub =
new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size",
@@ -132,8 +137,30 @@
static final StringConfigAttribute dbDirnameStub =
new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR,
- "changelog storage directory path", false,
- false, true);
+ "changelog storage directory path", false, false, true);
+
+ /**
+ * The set of time units that will be used for expressing the
+ * changelog purge delay.
+ */
+ private static final LinkedHashMap<String,Double> purgeTimeUnits =
+ new LinkedHashMap<String,Double>();
+
+ static
+ {
+ purgeTimeUnits.put(TIME_UNIT_SECONDS_ABBR, 1D);
+ purgeTimeUnits.put(TIME_UNIT_SECONDS_FULL, 1D);
+ purgeTimeUnits.put(TIME_UNIT_MINUTES_ABBR, 60D);
+ purgeTimeUnits.put(TIME_UNIT_MINUTES_FULL, 1D);
+ purgeTimeUnits.put(TIME_UNIT_HOURS_ABBR, 60*60D);
+ purgeTimeUnits.put(TIME_UNIT_HOURS_FULL, 60*60D);
+ purgeTimeUnits.put(TIME_UNIT_DAYS_ABBR, 24*60*60D);
+ purgeTimeUnits.put(TIME_UNIT_DAYS_FULL, 24*60*60D);
+ }
+
+ static final IntegerWithUnitConfigAttribute purgeDelayStub =
+ new IntegerWithUnitConfigAttribute(PURGE_DELAY_ATTR,
+ "changelog purge delay", false, purgeTimeUnits, true, 0, false, 0);
/**
* Check if a ConfigEntry is valid.
@@ -299,6 +326,20 @@
e.getMessage() + " " + getFileForPath(dbDirname));
}
+ /*
+ * Read the Purge Delay (trim age) attribute
+ */
+ IntegerWithUnitConfigAttribute purgeDelayAttr =
+ (IntegerWithUnitConfigAttribute) config.getConfigAttribute(
+ purgeDelayStub);
+ if (purgeDelayAttr == null)
+ trimAge = 24*60*60; // not present : use the default value : 1 day
+ else
+ {
+ trimAge = purgeDelayAttr.activeCalculatedValue();
+ configAttributes.add(purgeDelayAttr);
+ }
+
initialize(changelogServerId, changelogPort);
configDn = config.getDN();
@@ -606,4 +647,16 @@
{
return new DbHandler(id, baseDn, this, dbEnv);
}
+
+ /**
+ * Retrieves the time after which changes must be deleted from the
+ * persistent storage (in milliseconds).
+ *
+ * @return The time after which changes must be deleted from the
+ * persistent storage (in milliseconds).
+ */
+ public long getTrimage()
+ {
+ return trimAge * 1000;
+ }
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
index 90132ac..ebd5d63 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -83,13 +83,17 @@
private boolean done = false;
private DirectoryThread thread = null;
private Object flushLock = new Object();
+ /**
+ * the trim age in milliseconds.
+ */
+ private long trimage;
/**
* Creates a New dbHandler associated to a given LDAP server.
*
* @param id Identifier of the DB.
- * @param baseDn of the DB.
- * @param changelog the Changelog that creates this dbHandler.
+ * @param baseDn the baseDn for which this DB was created.
+ * @param changelog The Changelog that creates this dbHandler.
* @param dbenv the Database Env to use to create the Changelog DB.
* @throws DatabaseException If a database problem happened
*/
@@ -99,6 +103,7 @@
{
this.serverId = id;
this.baseDn = baseDn;
+ this.trimage = changelog.getTrimage();
db = new ChangelogDB(id, baseDn, changelog, dbenv);
firstChange = db.readFirstChange();
lastChange = db.readLastChange();
@@ -337,9 +342,10 @@
*/
private void trim() throws DatabaseException, Exception
{
+ if (trimage == 0)
+ return;
int size = 0;
boolean finished = false;
- int trimage = 24*60*60*1000; // TODO : make trim-age a config parameter
ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
(short) 0, (short)0);
@@ -493,4 +499,13 @@
return(baseDn + " " + serverId + " " + firstChange + " " + lastChange);
}
+ /**
+ * Set the Purge delay for this db Handler.
+ * @param delay The purge delay in Milliseconds.
+ */
+ public void setPurgeDelay(long delay)
+ {
+ trimage = delay;
+ }
+
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
index b592b88..3998104 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -274,6 +274,58 @@
CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 36;
/**
+ * The message ID for the description of the attribute used to specify the
+ * list of other Changelog Servers in the Changelog Server
+ * Configuration.
+ */
+ public static final int MSGID_CHANGELOG_SERVER_ATTR =
+ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 37;
+
+ /**
+ * The message ID for the description of the attribute used to specify
+ * the identifier of the Changelog Server.
+ */
+ public static final int MSGID_SERVER_ID_ATTR =
+ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 38;
+
+ /**
+ * The message id for the description of the attribute used to specify
+ * the port number of the Changelog Server.
+ */
+ public static final int MSGID_CHANGELOG_PORT_ATTR =
+ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 39;
+
+ /**
+ * The message id for the description of the attribute used to specify
+ * the receive Window Size used by a Changelog Server.
+ */
+ public static final int MSGID_WINDOW_SIZE_ATTR =
+ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 40;
+
+ /**
+ * The message id for thedescription of the attribute used to specify
+ * the maximum queue size used by a Changelog Server.
+ */
+ public static final int MSGID_QUEUE_SIZE_ATTR =
+ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 41;
+
+ /**
+ * The message id for the Attribute used to specify the directory where the
+ * persistent storage of the Changelog server will be saved.
+ */
+ public static final int MSGID_CHANGELOG_DIR_PATH_ATTR =
+ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42;
+
+ /**
+ * The message id for thedescription of the attribute used to configure
+ * the purge delay of the Changelog Servers.
+ */
+ public static final int MSGID_PURGE_DELAY_ATTR =
+ CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43;
+
+
+
+ /**
* Register the messages from this class in the core server.
*
*/
@@ -367,5 +419,35 @@
MessageHandler.registerMessage(MSGID_FILE_CHECK_CREATE_FAILED,
"An Exception was caught while testing existence or trying " +
" to create the directory for the changelog database : %s");
+ MessageHandler.registerMessage(MSGID_CHANGELOG_SERVER_ATTR,
+ "Specifies the list of Changelog Servers to which this" +
+ " Changelog Server should connect. Each value of this attribute" +
+ " should contain a values build with the hostname and the port" +
+ " number of the remote server separated with a \":\"");
+ MessageHandler.registerMessage(MSGID_SERVER_ID_ATTR,
+ "Specifies the server ID. Each Changelog Server in the topology" +
+ " Must be assigned a unique server ID in the topology.");
+ MessageHandler.registerMessage(MSGID_CHANGELOG_PORT_ATTR,
+ "Specifies the port number that the changelog server will use to" +
+ " listen for connections from LDAP servers.");
+ MessageHandler.registerMessage(MSGID_WINDOW_SIZE_ATTR,
+ "Specifies the receive window size of the changelog server.");
+ MessageHandler.registerMessage(MSGID_QUEUE_SIZE_ATTR,
+ "Specifies the receive queue size of the changelog server." +
+ " The Changelog servers will queue up to this number of messages" +
+ " in its memory queue and save the older messages to persistent" +
+ " storage. Using a larger size may improve performances when" +
+ " The synchronization delay is larger than this size but at the cost" +
+ " of using more memory.");
+ MessageHandler.registerMessage(MSGID_CHANGELOG_DIR_PATH_ATTR,
+ "Specifies the Changelog Server directory. The Changelog server" +
+ " will create all persistent storage below this path.");
+ MessageHandler.registerMessage(MSGID_PURGE_DELAY_ATTR,
+ "Specifies the Changelog Purge Delay, The Changelog servers will" +
+ " keep all changes up to this amount of time before deleting them." +
+ " This values defines the maximum age of a backup that can be" +
+ " restored because changelog servers would not be able to refresh" +
+ " LDAP servers with older versions of the data. A zero value" +
+ " can be used to specify an infinite delay (or never purge).");
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java
new file mode 100644
index 0000000..5590012
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java
@@ -0,0 +1,129 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2006 Sun Microsystems, Inc.
+ */
+package org.opends.server.synchronization.changelog;
+
+import java.io.File;
+import java.net.ServerSocket;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.config.ConfigEntry;
+import org.opends.server.synchronization.SynchronizationTestCase;
+import org.opends.server.synchronization.common.ChangeNumber;
+import org.opends.server.synchronization.common.ChangeNumberGenerator;
+import org.opends.server.synchronization.protocol.DeleteMsg;
+import org.opends.server.types.DN;
+import org.opends.server.types.Entry;
+import org.testng.annotations.Test;
+import static org.testng.Assert.*;
+
+/**
+ * Test the dbHandler class
+ */
+public class dbHandlerTest extends SynchronizationTestCase
+{
+ @Test()
+ void testDbHandlerTrim() throws Exception
+ {
+ TestCaseUtils.startServer();
+
+ // find a free port for the changelog server
+ ServerSocket socket = TestCaseUtils.bindFreePort();
+ int changelogPort = socket.getLocalPort();
+ socket.close();
+
+ // configure a Changelog server.
+ String changelogLdif =
+ "dn: cn=Changelog Server\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-changelog-server-config\n"
+ + "cn: Changelog Server\n"
+ + "ds-cfg-changelog-port: "+ changelogPort + "\n"
+ + "ds-cfg-changelog-server-id: 2\n"
+ + "ds-cfg-changelog-purge-delay: 0 d";
+ Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif);
+ ConfigEntry changelogConfig = new ConfigEntry(tmp, null);
+ Changelog changelog = new Changelog(changelogConfig);
+
+ // create or clean a directory for the dbHandler
+ String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+ String path = buildRoot + File.separator + "dbHandler";
+ File testRoot = new File(path);
+ if (testRoot.exists())
+ {
+ TestCaseUtils.deleteDirectory(testRoot);
+ }
+ testRoot.mkdirs();
+
+ ChangelogDbEnv dbEnv = new ChangelogDbEnv(path, changelog);
+
+ DbHandler handler =
+ new DbHandler((short) 1, DN.decode("o=test"), changelog, dbEnv);
+
+ ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
+ ChangeNumber changeNumber1 = gen.NewChangeNumber();
+ ChangeNumber changeNumber2 = gen.NewChangeNumber();
+ ChangeNumber changeNumber3 = gen.NewChangeNumber();
+
+ DeleteMsg update1 = new DeleteMsg("o=test", changeNumber1, "uid");
+ DeleteMsg update2 = new DeleteMsg("o=test", changeNumber2, "uid");
+ DeleteMsg update3 = new DeleteMsg("o=test", changeNumber3, "uid");
+
+ handler.add(update1);
+ handler.add(update2);
+ handler.add(update3);
+
+ // The ChangeNumber should not get purged
+ assertEquals(changeNumber1, handler.getFirstChange());
+ assertEquals(changeNumber3, handler.getLastChange());
+
+ handler.setPurgeDelay(1);
+
+ boolean purged = false;
+ int count = 300; // wait at most 60 seconds
+ while (!purged && (count>0))
+ {
+ ChangeNumber firstChange = handler.getFirstChange();
+ ChangeNumber lastChange = handler.getLastChange();
+ if ((!firstChange.equals(changeNumber3) ||
+ (!lastChange.equals(changeNumber3))))
+ {
+ TestCaseUtils.sleep(100);
+ }
+ else
+ {
+ purged = true;
+ }
+ }
+
+ handler.shutdown();
+ dbEnv.shutdown();
+ changelog.shutdown();
+
+ TestCaseUtils.deleteDirectory(testRoot);
+ }
+
+}
--
Gitblit v1.10.0