From 90f97a8328fb2433989b4ac05dd565dc6b4db4bc Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Mon, 02 Nov 2015 09:41:59 +0000
Subject: [PATCH] OPENDJ-1937 Replication draft change log changeNumber attribute should be synchronized with other RSs during initialization
---
opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java | 11 +
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 8 +
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 127 ++++++++++-----
opendj-server-legacy/tests/unit-tests-testng/resource/config-small.ldif | 1
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java | 16 +
opendj-server-legacy/src/messages/org/opends/messages/replication.properties | 9
opendj-server-legacy/src/main/java/org/opends/server/tasks/ResetChangeNumberTask.java | 136 +++++++++++++++++
opendj-server-legacy/src/messages/org/opends/messages/task.properties | 9 +
opendj-server-legacy/resource/schema/02-config.ldif | 27 +++
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java | 53 ++++++
opendj-server-legacy/src/main/java/org/opends/server/tasks/PurgeConflictsHistoricalTask.java | 34 +---
opendj-server-legacy/resource/config/config.ldif | 1
12 files changed, 354 insertions(+), 78 deletions(-)
diff --git a/opendj-server-legacy/resource/config/config.ldif b/opendj-server-legacy/resource/config/config.ldif
index ed5aedd..db9e6bc 100644
--- a/opendj-server-legacy/resource/config/config.ldif
+++ b/opendj-server-legacy/resource/config/config.ldif
@@ -67,6 +67,7 @@
ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask
ds-cfg-allowed-task: org.opends.server.tasks.ShutdownTask
ds-cfg-allowed-task: org.opends.server.tasks.PurgeConflictsHistoricalTask
+ds-cfg-allowed-task: org.opends.server.tasks.ResetChangeNumberTask
dn: cn=Schema Providers,cn=config
objectClass: top
diff --git a/opendj-server-legacy/resource/schema/02-config.ldif b/opendj-server-legacy/resource/schema/02-config.ldif
index a729e9b..f68026f 100644
--- a/opendj-server-legacy/resource/schema/02-config.ldif
+++ b/opendj-server-legacy/resource/schema/02-config.ldif
@@ -3781,6 +3781,24 @@
SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
SINGLE-VALUE
X-ORIGIN 'OpenDJ Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.145
+ NAME 'ds-task-reset-change-number-to'
+ EQUALITY integerMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDJ Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.146
+ NAME 'ds-task-reset-change-number-base-dn'
+ EQUALITY distinguishedNameMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.12
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDJ Directory Server' )
+attributeTypes: ( 1.3.6.1.4.1.36733.2.1.1.147
+ NAME 'ds-task-reset-change-number-csn'
+ EQUALITY caseIgnoreMatch
+ SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
+ SINGLE-VALUE
+ X-ORIGIN 'OpenDJ Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
NAME 'ds-cfg-access-control-handler'
SUP top
@@ -5815,4 +5833,11 @@
ds-cfg-disk-low-threshold $
ds-cfg-je-property )
X-ORIGIN 'OpenDJ Directory Server' )
-
+objectClasses: ( 1.3.6.1.4.1.36733.2.1.2.27
+ NAME 'ds-task-reset-change-number'
+ SUP ds-task
+ STRUCTURAL
+ MUST ( ds-task-reset-change-number-to $
+ ds-task-reset-change-number-base-dn $
+ ds-task-reset-change-number-csn )
+ X-ORIGIN 'OpenDS Directory Server' )
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java b/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
index 6e4bb09..798a6ec 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/config/ConfigConstants.java
@@ -4421,5 +4421,16 @@
public static final String ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT =
NAME_PREFIX_TASK + "purge-conflicts-historical-purged-values-count";
+ /** The name of the objectclass that will be used for a Directory Server reset change number task definition. */
+ public static final String OC_RESET_CHANGE_NUMBER_TASK = NAME_PREFIX_TASK + "reset-change-number";
+
+ /** The name of the attribute in a reset change number task that specifies the change number for the first change. */
+ public static final String ATTR_TASK_RESET_CHANGE_NUMBER_TO= NAME_PREFIX_TASK + "reset-change-number-to";
+
+ /** The name of the attribute in a reset change number task that specifies the csn of the new first change. */
+ public static final String ATTR_TASK_RESET_CHANGE_NUMBER_CSN = NAME_PREFIX_TASK + "reset-change-number-csn";
+
+ /** The name of the attribute in a reset change number task that specifies the basedn where the csn applies. */
+ public static final String ATTR_TASK_RESET_CHANGE_NUMBER_BASE_DN = NAME_PREFIX_TASK + "reset-change-number-base-dn";
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
index 23be4f7..56c06b6 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -21,10 +21,12 @@
* CDDL HEADER END
*
*
- * Copyright 2013-2014 ForgeRock AS
+ * Copyright 2013-2015 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.api;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.types.DN;
/**
* This class stores an index of all the changes seen by this server in the form
@@ -98,4 +100,16 @@
DBCursor<ChangeNumberIndexRecord> getCursorFrom(long startChangeNumber)
throws ChangelogException;
+ /**
+ * Resets ChangeNumber index to the given number and CSN.
+ * @param newFirstCN
+ * the new change number to appear as first change in the external changelog
+ * @param baseDN
+ * the new record for the first change
+ * @param newFirstCSN
+ * the CSN of the new first change
+ * @throws ChangelogException
+ * if an error occurs during reset
+ */
+ void resetChangeNumberTo(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException;
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index 425e5f2..4ddb38f 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
@@ -117,6 +118,10 @@
private final AtomicBoolean shutdown = new AtomicBoolean(false);
+ private final ReentrantReadWriteLock resetCNisRunningLock = new ReentrantReadWriteLock(false);
+
+ private final FileChangelogDB changelogDB;
+
/**
* Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
*
@@ -124,8 +129,9 @@
* server for this domain.
* @throws ChangelogException If a database problem happened
*/
- FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
+ FileChangeNumberIndexDB(FileChangelogDB changelogDB, ReplicationEnvironment replicationEnv) throws ChangelogException
{
+ this.changelogDB = changelogDB;
log = replicationEnv.getOrCreateCNIndexDB();
final ChangeNumberIndexRecord newestRecord = readLastRecord();
newestChangeNumber = getChangeNumber(newestRecord);
@@ -192,14 +198,30 @@
private long nextChangeNumber()
{
- return lastGeneratedChangeNumber.incrementAndGet();
+ resetCNisRunningLock.readLock().lock();
+ try {
+ long lgcn = lastGeneratedChangeNumber.incrementAndGet();
+ return lgcn;
+ }
+ finally
+ {
+ resetCNisRunningLock.readLock().unlock();
+ }
}
/** {@inheritDoc} */
@Override
public long getLastGeneratedChangeNumber()
{
- return lastGeneratedChangeNumber.get();
+ resetCNisRunningLock.readLock().lock();
+ try {
+ long lgcn = lastGeneratedChangeNumber.get();
+ return lgcn;
+ }
+ finally
+ {
+ resetCNisRunningLock.readLock().unlock();
+ }
}
/**
@@ -353,6 +375,31 @@
newestChangeNumber = NO_KEY;
}
+ /**
+ * Same as {@code clear()}, with the addition of also resetting last GeneratedChangeNumber counter to the provided
+ * value.
+ * @param newStart
+ * the new changeNumber for for the first change in the changelog
+ */
+ public void clearAndSetChangeNumber(long newStart) throws ChangelogException
+ {
+ resetCNisRunningLock.writeLock().lock();
+ try{
+ clear();
+ lastGeneratedChangeNumber.set(newStart - 1);
+ }
+ finally
+ {
+ resetCNisRunningLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void resetChangeNumberTo(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException
+ {
+ changelogDB.resetChangeNumberIndex(newFirstCN, baseDN, newFirstCSN);
+ }
+
/** Parser of records persisted in the FileChangeNumberIndex log. */
private static class ChangeNumberIndexDBParser implements RecordParser<Long, ChangeNumberIndexRecord>
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index db11877..e3f71df 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -57,6 +57,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -344,39 +345,13 @@
return;
}
+ shutdownCNIndexerAndPurger();
+
// Remember the first exception because :
// - we want to try to remove everything we want to remove
// - then throw the first encountered exception
ChangelogException firstException = null;
- final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
- if (indexer != null)
- {
- indexer.initiateShutdown();
- }
- final ChangelogDBPurger purger = cnPurger.getAndSet(null);
- if (purger != null)
- {
- purger.initiateShutdown();
- }
-
- // wait for shutdown of the threads holding cursors
- try
- {
- if (indexer != null)
- {
- indexer.join();
- }
- if (purger != null)
- {
- purger.join();
- }
- }
- catch (InterruptedException e)
- {
- // do nothing: we are already shutting down
- }
-
// now we can safely shutdown all DBs
try
{
@@ -411,6 +386,37 @@
}
}
+ private void shutdownCNIndexerAndPurger()
+ {
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
+ if (indexer != null)
+ {
+ indexer.initiateShutdown();
+ }
+ final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
+ }
+
+ // wait for shutdown of the threads holding cursors
+ try
+ {
+ if (indexer != null)
+ {
+ indexer.join();
+ }
+ if (purger != null)
+ {
+ purger.join();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing: we are already shutting down
+ }
+ }
+
/**
* Clears all records from the changelog (does not remove the changelog itself).
*
@@ -578,20 +584,7 @@
if (purgeDelayInMillis > 0)
{
- final ChangelogDBPurger newPurger = new ChangelogDBPurger();
- if (cnPurger.compareAndSet(null, newPurger))
- { // no purger was running, run this new one
- newPurger.start();
- }
- else
- { // a purger was already running, just wake that one up
- // to verify if some entries can be purged with the new purge delay
- final ChangelogDBPurger currentPurger = cnPurger.get();
- synchronized (currentPurger)
- {
- currentPurger.notify();
- }
- }
+ startCNPurger();
}
else
{
@@ -603,6 +596,24 @@
}
}
+ private void startCNPurger()
+ {
+ final ChangelogDBPurger newPurger = new ChangelogDBPurger();
+ if (cnPurger.compareAndSet(null, newPurger))
+ { // no purger was running, run this new one
+ newPurger.start();
+ }
+ else
+ { // a purger was already running, just wake that one up
+ // to verify if some entries can be purged
+ final ChangelogDBPurger currentPurger = cnPurger.get();
+ synchronized (currentPurger)
+ {
+ currentPurger.notify();
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override
public void setComputeChangeNumber(final boolean computeChangeNumber)
@@ -622,6 +633,35 @@
}
}
+ void resetChangeNumberIndex(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException
+ {
+ if (!config.isComputeChangeNumber())
+ {
+ throw new ChangelogException(ERR_REPLICATION_CHANGE_NUMBER_DISABLED.get(baseDN));
+ }
+ if (!getDomainNewestCSNs(baseDN).cover(newFirstCSN))
+ {
+ throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_CHANGE_NOT_PRESENT.get(newFirstCN, baseDN,
+ newFirstCSN));
+ }
+ if (getDomainOldestCSNs(baseDN).getCSN(newFirstCSN.getServerId()).isNewerThan(newFirstCSN))
+ {
+ throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD.get(newFirstCN, newFirstCSN));
+ }
+
+ shutdownCNIndexerAndPurger();
+ synchronized (cnIndexDBLock)
+ {
+ cnIndexDB.clearAndSetChangeNumber(newFirstCN);
+ cnIndexDB.addRecord(new ChangeNumberIndexRecord(newFirstCN, baseDN, newFirstCSN));
+ }
+ startIndexer();
+ if (purgeDelayInMillis > 0)
+ {
+ startCNPurger();
+ }
+ }
+
private void startIndexer()
{
final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
@@ -631,7 +671,6 @@
}
}
- /** {@inheritDoc} */
@Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
@@ -641,7 +680,7 @@
{
try
{
- cnIndexDB = new FileChangeNumberIndexDB(replicationEnv);
+ cnIndexDB = new FileChangeNumberIndexDB(this, replicationEnv);
}
catch (Exception e)
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index e405337..b247f04 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -26,6 +26,8 @@
*/
package org.opends.server.replication.server.changelog.je;
+import static org.opends.messages.ReplicationMessages.*;
+
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -375,4 +377,10 @@
newestChangeNumber = NO_KEY;
}
+ @Override
+ public void resetChangeNumberTo(long newFirstCN, DN baseDN, CSN newFirstCSN) throws ChangelogException
+ {
+ throw new ChangelogException(ERR_CHANGELOG_RESET_CHANGE_NUMBER_UNSUPPORTED.get());
+ }
+
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tasks/PurgeConflictsHistoricalTask.java b/opendj-server-legacy/src/main/java/org/opends/server/tasks/PurgeConflictsHistoricalTask.java
index 6189809..7d76769 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/tasks/PurgeConflictsHistoricalTask.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tasks/PurgeConflictsHistoricalTask.java
@@ -77,15 +77,6 @@
private TaskState initState;
- private static void debugInfo(String s)
- {
- if (logger.isTraceEnabled())
- {
- System.out.println(LocalizableMessage.raw(s));
- logger.trace(s);
- }
- }
-
/** {@inheritDoc} */
@Override
public LocalizableMessage getDisplayName() {
@@ -146,12 +137,8 @@
protected TaskState runTask()
{
Boolean purgeCompletedInTime = false;
- if (logger.isTraceEnabled())
- {
- debugInfo("[PURGE] PurgeConflictsHistoricalTask is starting "
- + "on domain: " + domain.getBaseDN()
- + "max duration (sec):" + purgeTaskMaxDurationInSec);
- }
+ logger.trace("PurgeConflictsHistoricalTask is starting on domain: %s max duration (sec): %d",
+ domain.getBaseDN(), purgeTaskMaxDurationInSec);
try
{
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME, purgeCompletedInTime.toString());
@@ -166,7 +153,7 @@
}
catch(DirectoryException de)
{
- debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage());
+ logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage());
if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED)
{
// Error raised at submission time
@@ -185,20 +172,17 @@
// sets in the attributes the last stats values
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount));
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI());
- debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs ");
+ logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount);
}
catch(Exception e)
{
- debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + e.getLocalizedMessage());
+ logger.trace("PurgeConflictsHistoricalTask exception %s", e.getLocalizedMessage());
initState = TaskState.STOPPED_BY_ERROR;
}
}
- if (logger.isTraceEnabled())
- {
- debugInfo("[PURGE] PurgeConflictsHistoricalTask is ending with state:" + initState +
- " completedInTime:" + purgeCompletedInTime);
- }
+ logger.trace("PurgeConflictsHistoricalTask is ending with state: %s completedInTime: %s",
+ initState, purgeCompletedInTime);
return initState;
}
@@ -231,12 +215,12 @@
{
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT, String.valueOf(purgeCount));
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CSN, lastCSN.toStringUI());
- debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs " + purgeCount);
+ logger.trace("PurgeConflictsHistoricalTask write attrs %d", purgeCount);
}
}
catch(DirectoryException de)
{
- debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " + de.getLocalizedMessage());
+ logger.trace("PurgeConflictsHistoricalTask exception %s", de.getLocalizedMessage());
initState = TaskState.STOPPED_BY_ERROR;
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/tasks/ResetChangeNumberTask.java b/opendj-server-legacy/src/main/java/org/opends/server/tasks/ResetChangeNumberTask.java
new file mode 100644
index 0000000..df6b2f8
--- /dev/null
+++ b/opendj-server-legacy/src/main/java/org/opends/server/tasks/ResetChangeNumberTask.java
@@ -0,0 +1,136 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2015 ForgeRock AS
+ */
+package org.opends.server.tasks;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.backends.task.Task;
+import org.opends.server.backends.task.TaskState;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+
+import java.util.List;
+
+import static org.forgerock.opendj.ldap.ResultCode.*;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_RESET_CHANGE_NUMBER_BASE_DN;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_RESET_CHANGE_NUMBER_CSN;
+import static org.opends.server.config.ConfigConstants.ATTR_TASK_RESET_CHANGE_NUMBER_TO;
+import static org.opends.server.core.DirectoryServer.getAttributeTypeOrDefault;
+import static org.opends.messages.TaskMessages.*;
+
+/**
+ * This class provides an implementation of a Directory Server task that can
+ * be used to rebuild the change number index with a given change number and a
+ * change represented by its CSN.
+ */
+public class ResetChangeNumberTask extends Task
+{
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+ private int newFirstChangeNumber;
+ private DN baseDN;
+ private CSN newFirstCSN;
+ private ReplicationServer targetRS;
+
+ @Override
+ public LocalizableMessage getDisplayName() {
+ return INFO_TASK_RESET_CHANGE_NUMBER.get();
+ }
+
+ @Override public void initializeTask() throws DirectoryException
+ {
+ if (TaskState.isDone(getTaskState()))
+ {
+ return;
+ }
+
+ final Entry taskEntry = getTaskEntry();
+ newFirstChangeNumber = TaskUtils.getSingleValueInteger(
+ getTaskParameter(taskEntry, ATTR_TASK_RESET_CHANGE_NUMBER_TO), 1);
+ newFirstCSN = CSN.valueOf(TaskUtils.getSingleValueString(
+ getTaskParameter(taskEntry, ATTR_TASK_RESET_CHANGE_NUMBER_CSN)));
+ baseDN = DN.valueOf(TaskUtils.getSingleValueString(
+ getTaskParameter(taskEntry, ATTR_TASK_RESET_CHANGE_NUMBER_BASE_DN)));
+
+ if (newFirstChangeNumber < 1)
+ {
+ throw new DirectoryException(UNWILLING_TO_PERFORM,
+ ERR_TASK_RESET_CHANGE_NUMBER_INVALID.get(newFirstChangeNumber));
+ }
+
+ List<ReplicationServer> allRSes = ReplicationServer.getAllInstances();
+ if (allRSes.isEmpty())
+ {
+ throw new DirectoryException(NO_SUCH_OBJECT, ERR_TASK_RESET_CHANGE_NUMBER_NO_RSES.get());
+ }
+
+ for (ReplicationServer rs : allRSes)
+ {
+ if (rs.getReplicationServerDomain(baseDN) != null)
+ {
+ targetRS = rs;
+ return;
+ }
+ }
+ throw new DirectoryException(NO_SUCH_OBJECT, ERR_TASK_RESET_CHANGE_NUMBER_CHANGELOG_NOT_FOUND.get(baseDN));
+ }
+
+ private List<Attribute> getTaskParameter(Entry taskEntry, String attrTaskResetChangeNumberTo)
+ {
+ AttributeType taskAttr = getAttributeTypeOrDefault(attrTaskResetChangeNumberTo);
+ return taskEntry.getAttribute(taskAttr);
+ }
+
+ @Override
+ protected TaskState runTask()
+ {
+ logger.trace("Reset change number task is starting with new changeNumber %d having CSN %s",
+ newFirstChangeNumber, newFirstCSN);
+
+ try
+ {
+ targetRS.getChangelogDB().getChangeNumberIndexDB().resetChangeNumberTo(newFirstChangeNumber, baseDN, newFirstCSN);
+ return returnWithDebug(TaskState.COMPLETED_SUCCESSFULLY);
+ }
+ catch (ChangelogException ce)
+ {
+ logger.error(ERR_TASK_RESET_CHANGE_NUMBER_FAILED, ce.getMessageObject());
+ return returnWithDebug(TaskState.STOPPED_BY_ERROR);
+ }
+ }
+
+ private TaskState returnWithDebug(TaskState state)
+ {
+ logger.trace("state: %s", state);
+ return state;
+ }
+}
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/replication.properties b/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
index 3184d97..cfdbfc2 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/replication.properties
@@ -525,8 +525,6 @@
change %s to replicaDB %s %s because: %s
ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB_240=Could not add \
change %s to replicaDB %s %s because flushing thread is shutting down
-NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES_285=You do not have sufficient privileges to \
- perform a search request on cn=changelog
ERR_CHANGELOG_READ_STATE_WRONG_ROOT_PATH_241=Error when retrieving changelog \
state from root path '%s' : directory might not exist
ERR_CHANGELOG_READ_STATE_NO_GENERATION_ID_FOUND_242=Error when retrieving \
@@ -631,4 +629,9 @@
ERR_CHANGELOG_CURSOR_ABORTED_290=Cursor on log '%s' has been aborted after \
a purge or a clear
ERR_CHANGELOG_CANNOT_READ_NEWEST_RECORD_291=Could not position and read newest record from log file '%s'
-
+ERR_CHANGELOG_RESET_CHANGE_NUMBER_UNSUPPORTED_292=The JE based changelog does not support resetting the change number
+ERR_CHANGELOG_RESET_CHANGE_NUMBER_CHANGE_NOT_PRESENT_293=The change number index could not be reset to start with %d \
+ in base DN '%s' because starting CSN '%s' does not exist in the change log
+ERR_CHANGELOG_RESET_CHANGE_NUMBER_CSN_TOO_OLD_294=The change number could not be reset to %d because the associated \
+ change with CSN '%s' has already been purged from the change log. Try resetting to a more recent change
+ERR_REPLICATION_CHANGE_NUMBER_DISABLED_295=Change number indexing is disabled for replication domain '%s'
diff --git a/opendj-server-legacy/src/messages/org/opends/messages/task.properties b/opendj-server-legacy/src/messages/org/opends/messages/task.properties
index a42ac60..7846847 100644
--- a/opendj-server-legacy/src/messages/org/opends/messages/task.properties
+++ b/opendj-server-legacy/src/messages/org/opends/messages/task.properties
@@ -196,4 +196,11 @@
the rebuildAll or rebuildDegraded option is used
INFO_TASK_PURGE_CONFLICTS_HIST_NAME_109=Purge conflicts historical
ERR_TASK_INVALID_ATTRIBUTE_VALUE_110=Attribute %s has an invalid value. \
-Reason: %s
\ No newline at end of file
+Reason: %s
+INFO_TASK_RESET_CHANGE_NUMBER_111=Reset change number index to begin with a given number and change.
+ERR_TASK_RESET_CHANGE_NUMBER_CHANGELOG_NOT_FOUND_112=No changelog database was found for baseDN '%s'. Either the \
+ baseDN is not replicated or its changelog has not been enabled in this server.
+ERR_TASK_RESET_CHANGE_NUMBER_NO_RSES_113=The change number index cannot be reset because this OpenDJ instance \
+ does not appear to be a replication server
+ERR_TASK_RESET_CHANGE_NUMBER_INVALID_114=Invalid change number (%d) specified, it must be greater than zero
+ERR_TASK_RESET_CHANGE_NUMBER_FAILED_115=Unable to reset the change number index: %s
diff --git a/opendj-server-legacy/tests/unit-tests-testng/resource/config-small.ldif b/opendj-server-legacy/tests/unit-tests-testng/resource/config-small.ldif
index 3dd77d6..a3a5e74 100644
--- a/opendj-server-legacy/tests/unit-tests-testng/resource/config-small.ldif
+++ b/opendj-server-legacy/tests/unit-tests-testng/resource/config-small.ldif
@@ -67,6 +67,7 @@
ds-cfg-allowed-task: org.opends.server.tasks.RestoreTask
ds-cfg-allowed-task: org.opends.server.tasks.ShutdownTask
ds-cfg-allowed-task: org.opends.server.tasks.PurgeConflictsHistoricalTask
+ds-cfg-allowed-task: org.opends.server.tasks.ResetChangeNumberTask
dn: cn=Schema Providers,cn=config
objectClass: top
--
Gitblit v1.10.0