opendj-sdk/opends/resource/schema/02-config.ldif
@@ -1080,6 +1080,10 @@ NAME 'ds-cfg-certificate-fingerprint-algorithm' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.26027.1.1.319 NAME 'ds-cfg-changelog-purge-delay' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDS Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.1 NAME 'ds-cfg-access-control-handler' SUP top STRUCTURAL MUST ( cn $ ds-cfg-acl-handler-class $ ds-cfg-acl-handler-enabled ) opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/Changelog.java
@@ -29,6 +29,7 @@ import static org.opends.server.loggers.Error.logError; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.common.LogMessages.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.getFileForPath; import java.io.File; @@ -40,6 +41,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Set; @@ -49,6 +51,7 @@ import org.opends.server.config.ConfigEntry; import org.opends.server.config.ConfigException; import org.opends.server.config.IntegerConfigAttribute; import org.opends.server.config.IntegerWithUnitConfigAttribute; import org.opends.server.config.StringConfigAttribute; import org.opends.server.core.DirectoryServer; import org.opends.server.messages.MessageHandler; @@ -100,6 +103,8 @@ private int rcvWindow; private int queueSize; private String dbDirname = null; private long trimAge; // the time (in sec) after which the changes must // de deleted from the persistent storage. static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server"; static final String SERVER_ID_ATTR = "ds-cfg-changelog-server-id"; @@ -107,11 +112,12 @@ static final String WINDOW_SIZE_ATTR = "ds-cfg-window-size"; static final String QUEUE_SIZE_ATTR = "ds-cfg-changelog-max-queue-size"; static final String CHANGELOG_DIR_PATH_ATTR = "ds-cfg-changelog-db-dirname"; static final String PURGE_DELAY_ATTR = "ds-cfg-changelog-purge-delay"; static final IntegerConfigAttribute changelogPortStub = new IntegerConfigAttribute(CHANGELOG_PORT_ATTR, "changelog port", true, false, false, true, 0, true, 65535); true, false, false, true, 0, true, 65535); static final IntegerConfigAttribute serverIdStub = new IntegerConfigAttribute(SERVER_ID_ATTR, "server ID", true, false, @@ -119,8 +125,7 @@ static final StringConfigAttribute changelogStub = new StringConfigAttribute(CHANGELOG_SERVER_ATTR, "changelog server information", true, true, false); "changelog server information", true, true, false); static final IntegerConfigAttribute windowStub = new IntegerConfigAttribute(WINDOW_SIZE_ATTR, "window size", @@ -132,8 +137,30 @@ static final StringConfigAttribute dbDirnameStub = new StringConfigAttribute(CHANGELOG_DIR_PATH_ATTR, "changelog storage directory path", false, false, true); "changelog storage directory path", false, false, true); /** * The set of time units that will be used for expressing the * changelog purge delay. */ private static final LinkedHashMap<String,Double> purgeTimeUnits = new LinkedHashMap<String,Double>(); static { purgeTimeUnits.put(TIME_UNIT_SECONDS_ABBR, 1D); purgeTimeUnits.put(TIME_UNIT_SECONDS_FULL, 1D); purgeTimeUnits.put(TIME_UNIT_MINUTES_ABBR, 60D); purgeTimeUnits.put(TIME_UNIT_MINUTES_FULL, 1D); purgeTimeUnits.put(TIME_UNIT_HOURS_ABBR, 60*60D); purgeTimeUnits.put(TIME_UNIT_HOURS_FULL, 60*60D); purgeTimeUnits.put(TIME_UNIT_DAYS_ABBR, 24*60*60D); purgeTimeUnits.put(TIME_UNIT_DAYS_FULL, 24*60*60D); } static final IntegerWithUnitConfigAttribute purgeDelayStub = new IntegerWithUnitConfigAttribute(PURGE_DELAY_ATTR, "changelog purge delay", false, purgeTimeUnits, true, 0, false, 0); /** * Check if a ConfigEntry is valid. @@ -299,6 +326,20 @@ e.getMessage() + " " + getFileForPath(dbDirname)); } /* * Read the Purge Delay (trim age) attribute */ IntegerWithUnitConfigAttribute purgeDelayAttr = (IntegerWithUnitConfigAttribute) config.getConfigAttribute( purgeDelayStub); if (purgeDelayAttr == null) trimAge = 24*60*60; // not present : use the default value : 1 day else { trimAge = purgeDelayAttr.activeCalculatedValue(); configAttributes.add(purgeDelayAttr); } initialize(changelogServerId, changelogPort); configDn = config.getDN(); @@ -606,4 +647,16 @@ { return new DbHandler(id, baseDn, this, dbEnv); } /** * Retrieves the time after which changes must be deleted from the * persistent storage (in milliseconds). * * @return The time after which changes must be deleted from the * persistent storage (in milliseconds). */ public long getTrimage() { return trimAge * 1000; } } opendj-sdk/opends/src/server/org/opends/server/synchronization/changelog/DbHandler.java
@@ -83,13 +83,17 @@ private boolean done = false; private DirectoryThread thread = null; private Object flushLock = new Object(); /** * the trim age in milliseconds. */ private long trimage; /** * Creates a New dbHandler associated to a given LDAP server. * * @param id Identifier of the DB. * @param baseDn of the DB. * @param changelog the Changelog that creates this dbHandler. * @param baseDn the baseDn for which this DB was created. * @param changelog The Changelog that creates this dbHandler. * @param dbenv the Database Env to use to create the Changelog DB. * @throws DatabaseException If a database problem happened */ @@ -99,6 +103,7 @@ { this.serverId = id; this.baseDn = baseDn; this.trimage = changelog.getTrimage(); db = new ChangelogDB(id, baseDn, changelog, dbenv); firstChange = db.readFirstChange(); lastChange = db.readLastChange(); @@ -337,9 +342,10 @@ */ private void trim() throws DatabaseException, Exception { if (trimage == 0) return; int size = 0; boolean finished = false; int trimage = 24*60*60*1000; // TODO : make trim-age a config parameter ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage, (short) 0, (short)0); @@ -493,4 +499,13 @@ return(baseDn + " " + serverId + " " + firstChange + " " + lastChange); } /** * Set the Purge delay for this db Handler. * @param delay The purge delay in Milliseconds. */ public void setPurgeDelay(long delay) { trimage = delay; } } opendj-sdk/opends/src/server/org/opends/server/synchronization/common/LogMessages.java
@@ -274,6 +274,58 @@ CATEGORY_MASK_SYNC | SEVERITY_MASK_MILD_ERROR | 36; /** * The message ID for the description of the attribute used to specify the * list of other Changelog Servers in the Changelog Server * Configuration. */ public static final int MSGID_CHANGELOG_SERVER_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 37; /** * The message ID for the description of the attribute used to specify * the identifier of the Changelog Server. */ public static final int MSGID_SERVER_ID_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 38; /** * The message id for the description of the attribute used to specify * the port number of the Changelog Server. */ public static final int MSGID_CHANGELOG_PORT_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 39; /** * The message id for the description of the attribute used to specify * the receive Window Size used by a Changelog Server. */ public static final int MSGID_WINDOW_SIZE_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 40; /** * The message id for thedescription of the attribute used to specify * the maximum queue size used by a Changelog Server. */ public static final int MSGID_QUEUE_SIZE_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 41; /** * The message id for the Attribute used to specify the directory where the * persistent storage of the Changelog server will be saved. */ public static final int MSGID_CHANGELOG_DIR_PATH_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 42; /** * The message id for thedescription of the attribute used to configure * the purge delay of the Changelog Servers. */ public static final int MSGID_PURGE_DELAY_ATTR = CATEGORY_MASK_CORE | SEVERITY_MASK_INFORMATIONAL | 43; /** * Register the messages from this class in the core server. * */ @@ -367,5 +419,35 @@ MessageHandler.registerMessage(MSGID_FILE_CHECK_CREATE_FAILED, "An Exception was caught while testing existence or trying " + " to create the directory for the changelog database : %s"); MessageHandler.registerMessage(MSGID_CHANGELOG_SERVER_ATTR, "Specifies the list of Changelog Servers to which this" + " Changelog Server should connect. Each value of this attribute" + " should contain a values build with the hostname and the port" + " number of the remote server separated with a \":\""); MessageHandler.registerMessage(MSGID_SERVER_ID_ATTR, "Specifies the server ID. Each Changelog Server in the topology" + " Must be assigned a unique server ID in the topology."); MessageHandler.registerMessage(MSGID_CHANGELOG_PORT_ATTR, "Specifies the port number that the changelog server will use to" + " listen for connections from LDAP servers."); MessageHandler.registerMessage(MSGID_WINDOW_SIZE_ATTR, "Specifies the receive window size of the changelog server."); MessageHandler.registerMessage(MSGID_QUEUE_SIZE_ATTR, "Specifies the receive queue size of the changelog server." + " The Changelog servers will queue up to this number of messages" + " in its memory queue and save the older messages to persistent" + " storage. Using a larger size may improve performances when" + " The synchronization delay is larger than this size but at the cost" + " of using more memory."); MessageHandler.registerMessage(MSGID_CHANGELOG_DIR_PATH_ATTR, "Specifies the Changelog Server directory. The Changelog server" + " will create all persistent storage below this path."); MessageHandler.registerMessage(MSGID_PURGE_DELAY_ATTR, "Specifies the Changelog Purge Delay, The Changelog servers will" + " keep all changes up to this amount of time before deleting them." + " This values defines the maximum age of a backup that can be" + " restored because changelog servers would not be able to refresh" + " LDAP servers with older versions of the data. A zero value" + " can be used to specify an infinite delay (or never purge)."); } } opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/synchronization/changelog/dbHandlerTest.java
New file @@ -0,0 +1,129 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.synchronization.changelog; import java.io.File; import java.net.ServerSocket; import org.opends.server.TestCaseUtils; import org.opends.server.config.ConfigEntry; import org.opends.server.synchronization.SynchronizationTestCase; import org.opends.server.synchronization.common.ChangeNumber; import org.opends.server.synchronization.common.ChangeNumberGenerator; import org.opends.server.synchronization.protocol.DeleteMsg; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.testng.annotations.Test; import static org.testng.Assert.*; /** * Test the dbHandler class */ public class dbHandlerTest extends SynchronizationTestCase { @Test() void testDbHandlerTrim() throws Exception { TestCaseUtils.startServer(); // find a free port for the changelog server ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); socket.close(); // configure a Changelog server. String changelogLdif = "dn: cn=Changelog Server\n" + "objectClass: top\n" + "objectClass: ds-cfg-synchronization-changelog-server-config\n" + "cn: Changelog Server\n" + "ds-cfg-changelog-port: "+ changelogPort + "\n" + "ds-cfg-changelog-server-id: 2\n" + "ds-cfg-changelog-purge-delay: 0 d"; Entry tmp = TestCaseUtils.entryFromLdifString(changelogLdif); ConfigEntry changelogConfig = new ConfigEntry(tmp, null); Changelog changelog = new Changelog(changelogConfig); // create or clean a directory for the dbHandler String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = buildRoot + File.separator + "dbHandler"; File testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); ChangelogDbEnv dbEnv = new ChangelogDbEnv(path, changelog); DbHandler handler = new DbHandler((short) 1, DN.decode("o=test"), changelog, dbEnv); ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0); ChangeNumber changeNumber1 = gen.NewChangeNumber(); ChangeNumber changeNumber2 = gen.NewChangeNumber(); ChangeNumber changeNumber3 = gen.NewChangeNumber(); DeleteMsg update1 = new DeleteMsg("o=test", changeNumber1, "uid"); DeleteMsg update2 = new DeleteMsg("o=test", changeNumber2, "uid"); DeleteMsg update3 = new DeleteMsg("o=test", changeNumber3, "uid"); handler.add(update1); handler.add(update2); handler.add(update3); // The ChangeNumber should not get purged assertEquals(changeNumber1, handler.getFirstChange()); assertEquals(changeNumber3, handler.getLastChange()); handler.setPurgeDelay(1); boolean purged = false; int count = 300; // wait at most 60 seconds while (!purged && (count>0)) { ChangeNumber firstChange = handler.getFirstChange(); ChangeNumber lastChange = handler.getLastChange(); if ((!firstChange.equals(changeNumber3) || (!lastChange.equals(changeNumber3)))) { TestCaseUtils.sleep(100); } else { purged = true; } } handler.shutdown(); dbEnv.shutdown(); changelog.shutdown(); TestCaseUtils.deleteDirectory(testRoot); } }