From 2273c26793fe6e3abfd90a400823e8e46b3303bb Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Mon, 15 Dec 2008 16:07:29 +0000
Subject: [PATCH] - [Issue 274] Recurring Tasks
---
opends/src/server/org/opends/server/backends/task/TaskScheduler.java | 184 +++++++++++++++++++++++++++++++++++++--------
1 files changed, 150 insertions(+), 34 deletions(-)
diff --git a/opends/src/server/org/opends/server/backends/task/TaskScheduler.java b/opends/src/server/org/opends/server/backends/task/TaskScheduler.java
index 7dcd46b..6c30a95 100644
--- a/opends/src/server/org/opends/server/backends/task/TaskScheduler.java
+++ b/opends/src/server/org/opends/server/backends/task/TaskScheduler.java
@@ -37,6 +37,7 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -55,6 +56,7 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
+import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
@@ -191,6 +193,27 @@
DirectoryServer.registerAlertGenerator(this);
initializeTasksFromBackingFile();
+
+ for (RecurringTask recurringTask : recurringTasks.values()) {
+ Task task = null;
+ try {
+ task = recurringTask.scheduleNextIteration();
+ } catch (DirectoryException de) {
+ logError(de.getMessageObject());
+ }
+ if (task != null) {
+ try {
+ scheduleTask(task, false);
+ } catch (DirectoryException de) {
+ // This task might have been already scheduled from before
+ // and thus got initialized from backing file, otherwise
+ // log error and continue.
+ if (de.getResultCode() != ResultCode.ENTRY_ALREADY_EXISTS) {
+ logError(de.getMessageObject());
+ }
+ }
+ }
+ }
}
@@ -224,7 +247,12 @@
throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, message);
}
- recurringTasks.put(id, recurringTask);
+ Attribute attr = Attributes.create(ATTR_TASK_STATE,
+ TaskState.RECURRING.toString());
+ ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
+ attrList.add(attr);
+ Entry recurringTaskEntry = recurringTask.getRecurringTaskEntry();
+ recurringTaskEntry.putAttribute(attr.getAttributeType(), attrList);
if (scheduleIteration)
{
@@ -236,6 +264,7 @@
}
}
+ recurringTasks.put(id, recurringTask);
writeState();
}
finally
@@ -264,24 +293,19 @@
try
{
+ RecurringTask recurringTask = recurringTasks.remove(recurringTaskID);
+ writeState();
+
for (Task t : tasks.values())
{
if ((t.getRecurringTaskID() != null) &&
(t.getRecurringTaskID().equals(recurringTaskID)) &&
- (! TaskState.isDone(t.getTaskState())))
+ (!TaskState.isDone(t.getTaskState())))
{
- Message message = ERR_TASKSCHED_REMOVE_RECURRING_EXISTING_ITERATION.
- get(String.valueOf(recurringTaskID),
- String.valueOf(t.getTaskID()));
- throw new DirectoryException(
- ResultCode.UNWILLING_TO_PERFORM, message);
+ cancelTask(t.getTaskID());
}
}
-
- RecurringTask recurringTask = recurringTasks.remove(recurringTaskID);
- writeState();
-
return recurringTask;
}
finally
@@ -348,7 +372,15 @@
}
else if (TaskState.isDone(state))
{
- completedTasks.add(task);
+ if ((state == TaskState.CANCELED_BEFORE_STARTING) &&
+ task.isRecurring())
+ {
+ pendingTasks.add(task);
+ }
+ else
+ {
+ completedTasks.add(task);
+ }
}
else
{
@@ -459,6 +491,53 @@
/**
+ * Removes the specified pending iteration of recurring task. It will
+ * be removed from the task set but still be kept in the pending set.
+ *
+ * @param taskID The task ID of the pending iteration to remove.
+ *
+ * @return The task that was removed.
+ *
+ * @throws DirectoryException If the requested task is not in the
+ * pending queue.
+ */
+ public Task removeRecurringTaskIteration(String taskID)
+ throws DirectoryException
+ {
+ schedulerLock.lock();
+
+ try
+ {
+ Task t = tasks.get(taskID);
+ if (t == null)
+ {
+ Message message = ERR_TASKSCHED_REMOVE_PENDING_NO_SUCH_TASK.get(
+ String.valueOf(taskID));
+ throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
+ }
+
+ if (TaskState.isPending(t.getTaskState()))
+ {
+ tasks.remove(taskID);
+ writeState();
+ return t;
+ }
+ else
+ {
+ Message message = ERR_TASKSCHED_REMOVE_PENDING_NOT_PENDING.get(
+ String.valueOf(taskID));
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
+ }
+ }
+ finally
+ {
+ schedulerLock.unlock();
+ }
+ }
+
+
+
+ /**
* Removes the specified completed task.
*
* @param taskID The task ID of the completed task to remove.
@@ -546,7 +625,12 @@
}
else
{
- Task newIteration = recurringTask.scheduleNextIteration();
+ Task newIteration = null;
+ try {
+ newIteration = recurringTask.scheduleNextIteration();
+ } catch (DirectoryException de) {
+ logError(de.getMessageObject());
+ }
if (newIteration != null)
{
// FIXME -- What to do if new iteration is null?
@@ -746,6 +830,7 @@
* Operates in a loop, launching tasks at the appropriate time and performing
* any necessary periodic cleanup.
*/
+ @Override
public void run()
{
isRunning = true;
@@ -797,6 +882,31 @@
long waitTime = t.getScheduledStartTime() - TimeThread.getTime();
sleepTime = Math.min(sleepTime, waitTime);
}
+ // Recurring task iteration has to spawn the next one
+ // even if the current iteration has been canceled.
+ else if ((state == TaskState.CANCELED_BEFORE_STARTING) &&
+ t.isRecurring())
+ {
+ if (t.getScheduledStartTime() > TimeThread.getTime()) {
+ // If we're waiting for the start time to arrive,
+ // then see if that will come before the next
+ // sleep time is up.
+ long waitTime =
+ t.getScheduledStartTime() - TimeThread.getTime();
+ sleepTime = Math.min(sleepTime, waitTime);
+ } else {
+ TaskThread taskThread;
+ if (idleThreads.isEmpty()) {
+ taskThread = new TaskThread(this, nextThreadID++);
+ taskThread.start();
+ } else {
+ taskThread = idleThreads.removeFirst();
+ }
+ runningTasks.add(t);
+ activeThreads.put(t.getTaskID(), taskThread);
+ taskThread.setTask(t);
+ }
+ }
if (state != t.getTaskState())
{
@@ -876,7 +986,13 @@
{
// If the task has finished we don't want to restart it
TaskState state = task.getTaskState();
- if (state != null && TaskState.isDone(state))
+
+ // Reset task state if recurring.
+ if (state == TaskState.RECURRING) {
+ state = null;
+ }
+
+ if ((state != null) && TaskState.isDone(state))
{
return state;
}
@@ -1011,26 +1127,6 @@
String.valueOf(taskBackend.getTaskRootDN()));
logError(message);
}
- else if (parentDN.equals(taskBackend.getRecurringTasksParentDN()))
- {
- try
- {
- RecurringTask recurringTask = entryToRecurringTask(entry);
- addRecurringTask(recurringTask, false);
- }
- catch (DirectoryException de)
- {
- if (debugEnabled())
- {
- TRACER.debugCaught(DebugLogLevel.ERROR, de);
- }
-
- Message message =
- ERR_TASKSCHED_CANNOT_SCHEDULE_RECURRING_TASK_FROM_ENTRY.
- get(String.valueOf(entryDN), de.getMessageObject());
- logError(message);
- }
- }
else if (parentDN.equals(taskBackend.getScheduledTasksParentDN()))
{
try
@@ -1057,6 +1153,26 @@
logError(message);
}
}
+ else if (parentDN.equals(taskBackend.getRecurringTasksParentDN()))
+ {
+ try
+ {
+ RecurringTask recurringTask = entryToRecurringTask(entry);
+ addRecurringTask(recurringTask, false);
+ }
+ catch (DirectoryException de)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, de);
+ }
+
+ Message message =
+ ERR_TASKSCHED_CANNOT_SCHEDULE_RECURRING_TASK_FROM_ENTRY.
+ get(String.valueOf(entryDN), de.getMessageObject());
+ logError(message);
+ }
+ }
else
{
Message message = ERR_TASKSCHED_INVALID_TASK_ENTRY_DN.get(
--
Gitblit v1.10.0