/*
|
* CDDL HEADER START
|
*
|
* The contents of this file are subject to the terms of the
|
* Common Development and Distribution License, Version 1.0 only
|
* (the "License"). You may not use this file except in compliance
|
* with the License.
|
*
|
* You can obtain a copy of the license at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE
|
* or https://OpenDS.dev.java.net/OpenDS.LICENSE.
|
* See the License for the specific language governing permissions
|
* and limitations under the License.
|
*
|
* When distributing Covered Code, include this CDDL HEADER in each
|
* file and include the License file at
|
* trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
|
* add the following below this CDDL HEADER, with the fields enclosed
|
* by brackets "[]" replaced with your own identifying information:
|
* Portions Copyright [yyyy] [name of copyright owner]
|
*
|
* CDDL HEADER END
|
*
|
*
|
* Copyright 2006-2010 Sun Microsystems, Inc.
|
*/
|
package org.opends.server.tasks;
|
import org.opends.server.replication.plugin.LDAPReplicationDomain;
|
|
import org.opends.server.types.ResultCode;
|
|
import org.opends.messages.MessageBuilder;
|
|
|
import org.opends.messages.Message;
|
import org.opends.messages.Category;
|
import org.opends.messages.Severity;
|
|
import static org.opends.server.config.ConfigConstants.*;
|
import static org.opends.server.core.DirectoryServer.getAttributeType;
|
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
|
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
|
import java.util.List;
|
|
import org.opends.messages.TaskMessages;
|
import org.opends.server.backends.task.Task;
|
import org.opends.server.backends.task.TaskState;
|
import org.opends.server.loggers.debug.DebugTracer;
|
import org.opends.server.types.Attribute;
|
import org.opends.server.types.AttributeType;
|
import org.opends.server.types.DN;
|
import org.opends.server.types.DirectoryException;
|
import org.opends.server.types.Entry;
|
import org.opends.server.util.TimeThread;
|
import org.opends.server.replication.common.ChangeNumber;
|
|
/**
|
* This class provides an implementation of a Directory Server task that can
|
* be used to purge the replication historical informations stored in the
|
* user entries to solve conflicts.
|
*/
|
public class PurgeConflictsHistoricalTask extends Task
|
{
|
/**
|
* The default value for the maximum duration of the purge expressed in
|
* seconds.
|
*/
|
public static final int DEFAULT_MAX_DURATION = 60 * 60;
|
/**
|
* The tracer object for the debug logger.
|
*/
|
private static final DebugTracer TRACER = getTracer();
|
|
private String domainString = null;
|
private LDAPReplicationDomain domain = null;
|
|
/**
|
* current historical purge delay
|
* <--------------------------------->
|
* -----------------------------------------------------------------> t
|
* | | |
|
* current task task
|
* CN being purged start date max end date
|
* <------------>
|
* config.purgeMaxDuration
|
*
|
* The task will start purging the change with the oldest CN found.
|
* The task run as long as :
|
* - the end date (computed from the configured max duration) is not reached
|
* - the CN purged is oldest than the configured historical purge delay
|
*
|
*
|
*/
|
|
private int purgeTaskMaxDurationInSec = DEFAULT_MAX_DURATION;
|
|
TaskState initState;
|
|
|
private static final void debugInfo(String s)
|
{
|
if (debugEnabled())
|
{
|
System.out.println(Message.raw(Category.SYNC, Severity.NOTICE, s));
|
TRACER.debugInfo(s);
|
}
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
public Message getDisplayName() {
|
return TaskMessages.INFO_TASK_PURGE_CONFLICTS_HIST_NAME.get();
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override public void initializeTask() throws DirectoryException
|
{
|
if (TaskState.isDone(getTaskState()))
|
{
|
return;
|
}
|
|
// FIXME -- Do we need any special authorization here?
|
Entry taskEntry = getTaskEntry();
|
|
AttributeType typeDomainBase;
|
typeDomainBase =
|
getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_DOMAIN_DN, true);
|
|
List<Attribute> attrList;
|
attrList = taskEntry.getAttribute(typeDomainBase);
|
domainString = TaskUtils.getSingleValueString(attrList);
|
|
try
|
{
|
DN dn = DN.decode(domainString);
|
// We can assume that this is an LDAP replication domain
|
domain = LDAPReplicationDomain.retrievesReplicationDomain(dn);
|
}
|
catch(DirectoryException e)
|
{
|
MessageBuilder mb = new MessageBuilder();
|
mb.append(TaskMessages.ERR_TASK_INITIALIZE_INVALID_DN.get());
|
mb.append(e.getMessage());
|
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
|
mb.toMessage());
|
}
|
|
AttributeType typeMaxDuration;
|
typeMaxDuration =
|
getAttributeType(ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, true);
|
attrList = taskEntry.getAttribute(typeMaxDuration);
|
String maxDurationStringInSec = TaskUtils.getSingleValueString(attrList);
|
|
if (maxDurationStringInSec != null)
|
{
|
try
|
{
|
purgeTaskMaxDurationInSec = Integer.decode(maxDurationStringInSec);
|
}
|
catch(Exception e)
|
{
|
throw new DirectoryException(
|
ResultCode.UNWILLING_TO_PERFORM,
|
TaskMessages.ERR_TASK_INVALID_ATTRIBUTE_VALUE.get(
|
ATTR_TASK_CONFLICTS_HIST_PURGE_MAX_DURATION, e.getLocalizedMessage()));
|
}
|
}
|
}
|
|
/**
|
* {@inheritDoc}
|
*/
|
@Override
|
protected TaskState runTask()
|
{
|
Boolean purgeCompletedInTime = false;
|
if (debugEnabled())
|
{
|
debugInfo("[PURGE] PurgeConflictsHistoricalTask is starting "
|
+ "on domain: " + domain.getServiceID()
|
+ "max duration (sec):" + purgeTaskMaxDurationInSec);
|
}
|
try
|
{
|
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME,
|
purgeCompletedInTime.toString());
|
|
// launch the task
|
domain.purgeConflictsHistorical(this,
|
TimeThread.getTime() + (purgeTaskMaxDurationInSec*1000));
|
|
purgeCompletedInTime = true;
|
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COMPLETED_IN_TIME,
|
purgeCompletedInTime.toString());
|
|
initState = TaskState.COMPLETED_SUCCESSFULLY;
|
}
|
catch(DirectoryException de)
|
{
|
debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " +
|
de.getLocalizedMessage());
|
if (de.getResultCode() != ResultCode.ADMIN_LIMIT_EXCEEDED)
|
{
|
// Error raised at submission time
|
logError(de.getMessageObject());
|
initState = TaskState.STOPPED_BY_ERROR;
|
}
|
else
|
{
|
initState = TaskState.COMPLETED_SUCCESSFULLY;
|
}
|
}
|
finally
|
{
|
try
|
{
|
// sets in the attributes the last stats values
|
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT,
|
String.valueOf(this.purgeCount));
|
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CN,
|
this.lastCN.toStringUI());
|
debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs ");
|
}
|
catch(Exception e)
|
{
|
debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " +
|
e.getLocalizedMessage());
|
initState = TaskState.STOPPED_BY_ERROR;
|
}
|
}
|
|
if (debugEnabled())
|
{
|
debugInfo("[PURGE] PurgeConflictsHistoricalTask is ending " +
|
"with state:" + initState.toString() +
|
" completedInTime:" + purgeCompletedInTime);
|
}
|
return initState;
|
}
|
|
int updateAttrPeriod = 0;
|
ChangeNumber lastCN;
|
int purgeCount;
|
|
/**
|
* Set the last changenumber purged and the count of purged values in order
|
* to monitor the historical purge.
|
* @param lastCN the last changeNumber purged.
|
* @param purgeCount the count of purged values.
|
*/
|
public void setProgressStats(ChangeNumber lastCN, int purgeCount)
|
{
|
try
|
{
|
if (purgeCount == 0)
|
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_FIRST_CN,
|
lastCN.toStringUI());
|
|
// we don't want the update of the task to overload too much task duration
|
this.purgeCount = purgeCount;
|
this.lastCN = lastCN;
|
if (++updateAttrPeriod % 100 == 0)
|
{
|
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_COUNT,
|
String.valueOf(purgeCount));
|
|
replaceAttributeValue(ATTR_TASK_CONFLICTS_HIST_PURGE_LAST_CN,
|
lastCN.toStringUI());
|
debugInfo("[PURGE] PurgeConflictsHistoricalTask write attrs "
|
+ purgeCount);
|
}
|
}
|
catch(DirectoryException de)
|
{
|
debugInfo("[PURGE] PurgeConflictsHistoricalTask exception " +
|
de.getLocalizedMessage());
|
initState = TaskState.STOPPED_BY_ERROR;
|
}
|
}
|
}
|