opendj-server-legacy/resource/config/config.ldif
@@ -67,6 +67,7 @@ ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask ds-cfg-allowed-task: org.opends.server.tasks.ShutdownTask ds-cfg-allowed-task: org.opends.server.tasks.PurgeConflictsHistoricalTask ds-cfg-allowed-task: org.opends.server.tasks.ResetChangeNumberTask dn: cn=Schema Providers,cn=config objectClass: top opendj-server-legacy/resource/schema/02-config.ldif
@@ -3781,6 +3781,24 @@ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDJ Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.145 NAME 'ds-task-reset-change-number-to' EQUALITY integerMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'OpenDJ Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.146 NAME 'ds-task-reset-change-number-base-dn' EQUALITY distinguishedNameMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.12 SINGLE-VALUE X-ORIGIN 'OpenDJ Directory Server' ) attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.147 NAME 'ds-task-reset-change-number-csn' EQUALITY caseIgnoreMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'OpenDJ Directory Server' ) objectClasses: ( 1.3.6.1.4.1.26027.1.2.1 NAME 'ds-cfg-access-control-handler' SUP top @@ -5815,4 +5833,11 @@ ds-cfg-disk-low-threshold $ ds-cfg-je-property ) X-ORIGIN 'OpenDJ Directory Server' ) objectClasses: ( 1.3.6.1.4.1.36733.2.1.2.27 NAME 'ds-task-reset-change-number' SUP ds-task STRUCTURAL MUST ( ds-task-reset-change-number-to $ ds-task-reset-change-number-base-dn $ ds-task-reset-change-number-csn ) X-ORIGIN 'OpenDS Directory Server' ) opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
@@ -4421,5 +4421,16 @@ public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT = NAME_PREFIX_TASK + "purge-conflicts-historical-purged-values-count"; /** The name of the objectclass that will be used for a Directory Server reset change number task definition. */ public static final String OC_RESET_CHANGE_NUMBER_TASK = NAME_PREFIX_TASK + "reset-change-number"; /** The name of the attribute in a reset change number task that specifies the change number for the first change. */ public static final String ATTR_TASK_RESET_CHANGE_NUMBER_TO= NAME_PREFIX_TASK + "reset-change-number-to"; /** The name of the attribute in a reset change number task that specifies the csn of the new first change. */ public static final String ATTR_TASK_RESET_CHANGE_NUMBER_CSN = NAME_PREFIX_TASK + "reset-change-number-csn"; /** The name of the attribute in a reset change number task that specifies the basedn where the csn applies. */ public static final String ATTR_TASK_RESET_CHANGE_NUMBER_BASE_DN = NAME_PREFIX_TASK + "reset-change-number-base-dn"; } opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -21,10 +21,12 @@ * CDDL HEADER END * * * Copyright 2013-2014 ForgeRock AS * Copyright 2013-2015 ForgeRock AS */ package org.opends.server.replication.server.changelog.api; import org.opends.server.replication.common.CSN; import org.opends.server.types.DN; /** * This class stores an index of all the changes seen by this server in the form @@ -98,4 +100,16 @@ DBCursor<ChangeNumberIndexRecord> getCursorFrom(long startChangeNumber) throws ChangelogException; /** * Resets ChangeNumber index to the given number and CSN. * @param newFirstCN * the new change number to appear as first change in the external changelog * @param baseDN * the new record for the first change * @param newFirstCSN * the CSN of the new first change * @throws ChangelogException * if an error occurs during reset */ void resetChangeNumberTo(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException; } opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -31,6 +31,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.config.server.ConfigException; @@ -117,6 +118,10 @@ private final AtomicBoolean shutdown = new AtomicBoolean(false); private final ReentrantReadWriteLock resetCNisRunningLock = new ReentrantReadWriteLock(false); private final FileChangelogDB changelogDB; /** * Creates a new JEChangeNumberIndexDB associated to a given LDAP server. * @@ -124,8 +129,9 @@ * server for this domain. * @throws ChangelogException If a database problem happened */ FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException FileChangeNumberIndexDB(FileChangelogDB changelogDB, ReplicationEnvironment replicationEnv) throws ChangelogException { this.changelogDB = changelogDB; log = replicationEnv.getOrCreateCNIndexDB(); final ChangeNumberIndexRecord newestRecord = readLastRecord(); newestChangeNumber = getChangeNumber(newestRecord); @@ -192,14 +198,30 @@ private long nextChangeNumber() { return lastGeneratedChangeNumber.incrementAndGet(); resetCNisRunningLock.readLock().lock(); try { long lgcn = lastGeneratedChangeNumber.incrementAndGet(); return lgcn; } finally { resetCNisRunningLock.readLock().unlock(); } } /** {@inheritDoc} */ @Override public long getLastGeneratedChangeNumber() { return lastGeneratedChangeNumber.get(); resetCNisRunningLock.readLock().lock(); try { long lgcn = lastGeneratedChangeNumber.get(); return lgcn; } finally { resetCNisRunningLock.readLock().unlock(); } } /** @@ -353,6 +375,31 @@ newestChangeNumber = NO_KEY; } /** * Same as {@code clear()}, with the addition of also resetting last GeneratedChangeNumber counter to the provided * value. * @param newStart * the new changeNumber for for the first change in the changelog */ public void clearAndSetChangeNumber(long newStart) throws ChangelogException { resetCNisRunningLock.writeLock().lock(); try{ clear(); lastGeneratedChangeNumber.set(newStart - 1); } finally { resetCNisRunningLock.writeLock().unlock(); } } @Override public void resetChangeNumberTo(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException { changelogDB.resetChangeNumberIndex(newFirstCN, baseDN, newFirstCSN); } /** Parser of records persisted in the FileChangeNumberIndex log. */ private static class ChangeNumberIndexDBParser implements RecordParser<Long, ChangeNumberIndexRecord> { opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -57,6 +57,7 @@ import org.opends.server.replication.server.ChangelogState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; @@ -344,39 +345,13 @@ return; } shutdownCNIndexerAndPurger(); // Remember the first exception because : // - we want to try to remove everything we want to remove // - then throw the first encountered exception ChangelogException firstException = null; final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); if (indexer != null) { indexer.initiateShutdown(); } final ChangelogDBPurger purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); } // wait for shutdown of the threads holding cursors try { if (indexer != null) { indexer.join(); } if (purger != null) { purger.join(); } } catch (InterruptedException e) { // do nothing: we are already shutting down } // now we can safely shutdown all DBs try { @@ -411,6 +386,37 @@ } } private void shutdownCNIndexerAndPurger() { final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); if (indexer != null) { indexer.initiateShutdown(); } final ChangelogDBPurger purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); } // wait for shutdown of the threads holding cursors try { if (indexer != null) { indexer.join(); } if (purger != null) { purger.join(); } } catch (InterruptedException e) { // do nothing: we are already shutting down } } /** * Clears all records from the changelog (does not remove the changelog itself). * @@ -578,20 +584,7 @@ if (purgeDelayInMillis > 0) { final ChangelogDBPurger newPurger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, newPurger)) { // no purger was running, run this new one newPurger.start(); } else { // a purger was already running, just wake that one up // to verify if some entries can be purged with the new purge delay final ChangelogDBPurger currentPurger = cnPurger.get(); synchronized (currentPurger) { currentPurger.notify(); } } startCNPurger(); } else { @@ -603,6 +596,24 @@ } } private void startCNPurger() { final ChangelogDBPurger newPurger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, newPurger)) { // no purger was running, run this new one newPurger.start(); } else { // a purger was already running, just wake that one up // to verify if some entries can be purged final ChangelogDBPurger currentPurger = cnPurger.get(); synchronized (currentPurger) { currentPurger.notify(); } } } /** {@inheritDoc} */ @Override public void setComputeChangeNumber(final boolean computeChangeNumber) @@ -622,6 +633,35 @@ } } void resetChangeNumberIndex(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException { if (!config.isComputeChangeNumber()) { throw new ChangelogException(ERR_REPLICATION_CHANGE_NUMBER_DISABLED.get(baseDN)); } if (!getDomainNewestCSNs(baseDN).cover(newFirstCSN)) { throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_CHANGE_NOT_PRESENT.get(newFirstCN, baseDN, newFirstCSN)); } if (getDomainOldestCSNs(baseDN).getCSN(newFirstCSN.getServerId()).isNewerThan(newFirstCSN)) { throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD.get(newFirstCN, newFirstCSN)); } shutdownCNIndexerAndPurger(); synchronized (cnIndexDBLock) { cnIndexDB.clearAndSetChangeNumber(newFirstCN); cnIndexDB.addRecord(new ChangeNumberIndexRecord(newFirstCN, baseDN, newFirstCSN)); } startIndexer(); if (purgeDelayInMillis > 0) { startCNPurger(); } } private void startIndexer() { final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv); @@ -631,7 +671,6 @@ } } /** {@inheritDoc} */ @Override public ChangeNumberIndexDB getChangeNumberIndexDB() { @@ -641,7 +680,7 @@ { try { cnIndexDB = new FileChangeNumberIndexDB(replicationEnv); cnIndexDB = new FileChangeNumberIndexDB(this, replicationEnv); } catch (Exception e) { opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -26,6 +26,8 @@ */ package org.opends.server.replication.server.changelog.je; import static org.opends.messages.ReplicationMessages.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -375,4 +377,10 @@ newestChangeNumber = NO_KEY; } @Override public void resetChangeNumberTo(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException { throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_UNSUPPORTED.get()); } } opendj-server-legacy/src/main/java/org/opends/server/tasks/PurgeConflictsHistoricalTask.java
@@ -77,15 +77,6 @@ private TaskState initState; private static void debugInfo(String s) { if (logger.isTraceEnabled()) { System.out.println(LocalizableMessage.raw(s)); logger.trace(s); } } /** {@inheritDoc} */ @Override public LocalizableMessage getDisplayName() { @@ -146,12 +137,8 @@ protected TaskState runTask() { Boolean purgeCompletedInTime = false; if (logger.isTraceEnabled()) { debugInfo("[PURGE] PurgeConflictsHistoricalTask is starting " + "on domain: " + domain.getBaseDN() + "max duration (sec):" + purgeTaskMaxDurationInSec); } logger.trace("PurgeConflictsHistoricalTask is starting on domain: %s max duration (sec): %d", domain.getBaseDN(), purgeTaskMaxDurationInSec); try { replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString()); @@ -166,7 +153,7 @@ } catch(DirectoryException de) { debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage()); logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage()); if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED) { // Error raised at submission time @@ -185,20 +172,17 @@ // sets in the attributes the last stats values replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI()); debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs "); logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount); } catch(Exception e) { debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + e.getLocalizedMessage()); logger.trace("PurgeConflictsHistoricalTask exception %s", e.getLocalizedMessage()); initState = TaskState.STOPPED_BY_ERROR; } } if (logger.isTraceEnabled()) { debugInfo("[PURGE] PurgeConflictsHistoricalTask is ending with state:" + initState + " completedInTime:" + purgeCompletedInTime); } logger.trace("PurgeConflictsHistoricalTask is ending with state: %s completedInTime: %s", initState, purgeCompletedInTime); return initState; } @@ -231,12 +215,12 @@ { replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount)); replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI()); debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs " + purgeCount); logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount); } } catch(DirectoryException de) { debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage()); logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage()); initState = TaskState.STOPPED_BY_ERROR; } } opendj-server-legacy/src/main/java/org/opends/server/tasks/ResetChangeNumberTask.java
New file @@ -0,0 +1,136 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2015 ForgeRock AS */ package org.opends.server.tasks; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.opends.server.backends.task.Task; import org.opends.server.backends.task.TaskState; import org.opends.server.replication.common.CSN; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import java.util.List; import static org.forgerock.opendj.ldap.ResultCode.*; import static org.opends.server.config.ConfigConstants.ATTR_TASK_RESET_CHANGE_NUMBER_BASE_DN; import static org.opends.server.config.ConfigConstants.ATTR_TASK_RESET_CHANGE_NUMBER_CSN; import static org.opends.server.config.ConfigConstants.ATTR_TASK_RESET_CHANGE_NUMBER_TO; import static org.opends.server.core.DirectoryServer.getAttributeTypeOrDefault; import static org.opends.messages.TaskMessages.*; /** * This class provides an implementation of a Directory Server task that can * be used to rebuild the change number index with a given change number and a * change represented by its CSN. */ public class ResetChangeNumberTask extends Task { private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private int newFirstChangeNumber; private DN baseDN; private CSN newFirstCSN; private ReplicationServer targetRS; @Override public LocalizableMessage getDisplayName() { return INFO_TASK_RESET_CHANGE_NUMBER.get(); } @Override public void initializeTask() throws DirectoryException { if (TaskState.isDone(getTaskState())) { return; } final Entry taskEntry = getTaskEntry(); newFirstChangeNumber = TaskUtils.getSingleValueInteger( getTaskParameter(taskEntry, ATTR_TASK_RESET_CHANGE_NUMBER_TO), 1); newFirstCSN = CSN.valueOf(TaskUtils.getSingleValueString( getTaskParameter(taskEntry, ATTR_TASK_RESET_CHANGE_NUMBER_CSN))); baseDN = DN.valueOf(TaskUtils.getSingleValueString( getTaskParameter(taskEntry, ATTR_TASK_RESET_CHANGE_NUMBER_BASE_DN))); if (newFirstChangeNumber < 1) { throw new DirectoryException(UNWILLING_TO_PERFORM, ERR_TASK_RESET_CHANGE_NUMBER_INVALID.get(newFirstChangeNumber)); } List<ReplicationServer> allRSes = ReplicationServer.getAllInstances(); if (allRSes.isEmpty()) { throw new DirectoryException(NO_SUCH_OBJECT, ERR_TASK_RESET_CHANGE_NUMBER_NO_RSES.get()); } for (ReplicationServer rs : allRSes) { if (rs.getReplicationServerDomain(baseDN) != null) { targetRS = rs; return; } } throw new DirectoryException(NO_SUCH_OBJECT, ERR_TASK_RESET_CHANGE_NUMBER_CHANGELOG_NOT_FOUND.get(baseDN)); } private List<Attribute> getTaskParameter(Entry taskEntry, String attrTaskResetChangeNumberTo) { AttributeType taskAttr = getAttributeTypeOrDefault(attrTaskResetChangeNumberTo); return taskEntry.getAttribute(taskAttr); } @Override protected TaskState runTask() { logger.trace("Reset change number task is starting with new changeNumber %d having CSN %s", newFirstChangeNumber, newFirstCSN); try { targetRS.getChangelogDB().getChangeNumberIndexDB().resetChangeNumberTo(newFirstChangeNumber, baseDN, newFirstCSN); return returnWithDebug(TaskState.COMPLETED_SUCCESSFULLY); } catch (ChangelogException ce) { logger.error(ERR_TASK_RESET_CHANGE_NUMBER_FAILED, ce.getMessageObject()); return returnWithDebug(TaskState.STOPPED_BY_ERROR); } } private TaskState returnWithDebug(TaskState state) { logger.trace("state: %s", state); return state; } } opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -525,8 +525,6 @@ change %s to replicaDB %s %s because: %s ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \ change %s to replicaDB %s %s because flushing thread is shutting down NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES_285=You do not have sufficient privileges to \ perform a search request on cn=changelog ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \ state from root path '%s' : directory might not exist ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND_242=Error when retrieving \ @@ -631,4 +629,9 @@ ERR_CHANGELOG_CURSOR_ABORTED_290=Cursor on log '%s' has been aborted after \ a purge or a clear ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD_291=Could not position and read newest record from log file '%s' ERR_CHANGELOG_RESET_CHANGE_NUMBER_UNSUPPORTED_292=The JE based changelog does not support resetting the change number ERR_CHANGELOG_RESET_CHANGE_NUMBER_CHANGE_NOT_PRESENT_293=The change number index could not be reset to start with %d \ in base DN '%s' because starting CSN '%s' does not exist in the change log ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD_294=The change number could not be reset to %d because the associated \ change with CSN '%s' has already been purged from the change log. Try resetting to a more recent change ERR_REPLICATION_CHANGE_NUMBER_DISABLED_295=Change number indexing is disabled for replication domain '%s' opendj-server-legacy/src/messages/org/opends/messages/task.properties
@@ -197,3 +197,10 @@ INFO_TASK_PURGE_CONFLICTS_HIST_NAME_109=Purge conflicts historical ERR_TASK_INVALID_ATTRIBUTE_VALUE_110=Attribute %s has an invalid value. \ Reason: %s INFO_TASK_RESET_CHANGE_NUMBER_111=Reset change number index to begin with a given number and change. ERR_TASK_RESET_CHANGE_NUMBER_CHANGELOG_NOT_FOUND_112=No changelog database was found for baseDN '%s'. Either the \ baseDN is not replicated or its changelog has not been enabled in this server. ERR_TASK_RESET_CHANGE_NUMBER_NO_RSES_113=The change number index cannot be reset because this OpenDJ instance \ does not appear to be a replication server ERR_TASK_RESET_CHANGE_NUMBER_INVALID_114=Invalid change number (%d) specified, it must be greater than zero ERR_TASK_RESET_CHANGE_NUMBER_FAILED_115=Unable to reset the change number index: %s opendj-server-legacy/tests/unit-tests-testng/resource/config-small.ldif
@@ -67,6 +67,7 @@ ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask ds-cfg-allowed-task: org.opends.server.tasks.ShutdownTask ds-cfg-allowed-task: org.opends.server.tasks.PurgeConflictsHistoricalTask ds-cfg-allowed-task: org.opends.server.tasks.ResetChangeNumberTask dn: cn=Schema Providers,cn=config objectClass: top