From 2273c26793fe6e3abfd90a400823e8e46b3303bb Mon Sep 17 00:00:00 2001
From: abobrov <abobrov@localhost>
Date: Mon, 15 Dec 2008 16:07:29 +0000
Subject: [PATCH] - [Issue 274]  Recurring Tasks

---
 opends/src/server/org/opends/server/backends/task/TaskScheduler.java |  184 +++++++++++++++++++++++++++++++++++++--------
 1 files changed, 150 insertions(+), 34 deletions(-)

diff --git a/opends/src/server/org/opends/server/backends/task/TaskScheduler.java b/opends/src/server/org/opends/server/backends/task/TaskScheduler.java
index 7dcd46b..6c30a95 100644
--- a/opends/src/server/org/opends/server/backends/task/TaskScheduler.java
+++ b/opends/src/server/org/opends/server/backends/task/TaskScheduler.java
@@ -37,6 +37,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -55,6 +56,7 @@
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
+import org.opends.server.types.Attributes;
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
@@ -191,6 +193,27 @@
     DirectoryServer.registerAlertGenerator(this);
 
     initializeTasksFromBackingFile();
+
+    for (RecurringTask recurringTask : recurringTasks.values()) {
+      Task task = null;
+      try {
+        task = recurringTask.scheduleNextIteration();
+      } catch (DirectoryException de) {
+        logError(de.getMessageObject());
+      }
+      if (task != null) {
+        try {
+          scheduleTask(task, false);
+        } catch (DirectoryException de) {
+          // This task might have been already scheduled from before
+          // and thus got initialized from backing file, otherwise
+          // log error and continue.
+          if (de.getResultCode() != ResultCode.ENTRY_ALREADY_EXISTS) {
+            logError(de.getMessageObject());
+          }
+        }
+      }
+    }
   }
 
 
@@ -224,7 +247,12 @@
         throw new DirectoryException(ResultCode.ENTRY_ALREADY_EXISTS, message);
       }
 
-      recurringTasks.put(id, recurringTask);
+      Attribute attr = Attributes.create(ATTR_TASK_STATE,
+        TaskState.RECURRING.toString());
+      ArrayList<Attribute> attrList = new ArrayList<Attribute>(1);
+      attrList.add(attr);
+      Entry recurringTaskEntry = recurringTask.getRecurringTaskEntry();
+      recurringTaskEntry.putAttribute(attr.getAttributeType(), attrList);
 
       if (scheduleIteration)
       {
@@ -236,6 +264,7 @@
         }
       }
 
+      recurringTasks.put(id, recurringTask);
       writeState();
     }
     finally
@@ -264,24 +293,19 @@
 
     try
     {
+      RecurringTask recurringTask = recurringTasks.remove(recurringTaskID);
+      writeState();
+
       for (Task t : tasks.values())
       {
         if ((t.getRecurringTaskID() != null) &&
             (t.getRecurringTaskID().equals(recurringTaskID)) &&
-            (! TaskState.isDone(t.getTaskState())))
+            (!TaskState.isDone(t.getTaskState())))
         {
-          Message message = ERR_TASKSCHED_REMOVE_RECURRING_EXISTING_ITERATION.
-              get(String.valueOf(recurringTaskID),
-                  String.valueOf(t.getTaskID()));
-          throw new DirectoryException(
-                  ResultCode.UNWILLING_TO_PERFORM, message);
+          cancelTask(t.getTaskID());
         }
       }
 
-
-      RecurringTask recurringTask = recurringTasks.remove(recurringTaskID);
-      writeState();
-
       return recurringTask;
     }
     finally
@@ -348,7 +372,15 @@
       }
       else if (TaskState.isDone(state))
       {
-        completedTasks.add(task);
+        if ((state == TaskState.CANCELED_BEFORE_STARTING) &&
+          task.isRecurring())
+        {
+          pendingTasks.add(task);
+        }
+        else
+        {
+          completedTasks.add(task);
+        }
       }
       else
       {
@@ -459,6 +491,53 @@
 
 
   /**
+   * Removes the specified pending iteration of recurring task. It will
+   * be removed from the task set but still be kept in the pending set.
+   *
+   * @param  taskID  The task ID of the pending iteration to remove.
+   *
+   * @return  The task that was removed.
+   *
+   * @throws  DirectoryException  If the requested task is not in the
+   *                              pending queue.
+   */
+  public Task removeRecurringTaskIteration(String taskID)
+         throws DirectoryException
+  {
+    schedulerLock.lock();
+
+    try
+    {
+      Task t = tasks.get(taskID);
+      if (t == null)
+      {
+        Message message = ERR_TASKSCHED_REMOVE_PENDING_NO_SUCH_TASK.get(
+            String.valueOf(taskID));
+        throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message);
+      }
+
+      if (TaskState.isPending(t.getTaskState()))
+      {
+        tasks.remove(taskID);
+        writeState();
+        return t;
+      }
+      else
+      {
+        Message message = ERR_TASKSCHED_REMOVE_PENDING_NOT_PENDING.get(
+            String.valueOf(taskID));
+        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message);
+      }
+    }
+    finally
+    {
+      schedulerLock.unlock();
+    }
+  }
+
+
+
+  /**
    * Removes the specified completed task.
    *
    * @param  taskID  The task ID of the completed task to remove.
@@ -546,7 +625,12 @@
         }
         else
         {
-          Task newIteration = recurringTask.scheduleNextIteration();
+          Task newIteration = null;
+          try {
+            newIteration = recurringTask.scheduleNextIteration();
+          } catch (DirectoryException de) {
+            logError(de.getMessageObject());
+          }
           if (newIteration != null)
           {
             // FIXME -- What to do if new iteration is null?
@@ -746,6 +830,7 @@
    * Operates in a loop, launching tasks at the appropriate time and performing
    * any necessary periodic cleanup.
    */
+  @Override
   public void run()
   {
     isRunning       = true;
@@ -797,6 +882,31 @@
               long waitTime = t.getScheduledStartTime() - TimeThread.getTime();
               sleepTime = Math.min(sleepTime, waitTime);
             }
+            // Recurring task iteration has to spawn the next one
+            // even if the current iteration has been canceled.
+            else if ((state == TaskState.CANCELED_BEFORE_STARTING) &&
+                     t.isRecurring())
+            {
+              if (t.getScheduledStartTime() > TimeThread.getTime()) {
+                // If we're waiting for the start time to arrive,
+                // then see if that will come before the next
+                // sleep time is up.
+                long waitTime =
+                  t.getScheduledStartTime() - TimeThread.getTime();
+                sleepTime = Math.min(sleepTime, waitTime);
+              } else {
+                TaskThread taskThread;
+                if (idleThreads.isEmpty()) {
+                  taskThread = new TaskThread(this, nextThreadID++);
+                  taskThread.start();
+                } else {
+                  taskThread = idleThreads.removeFirst();
+                }
+                runningTasks.add(t);
+                activeThreads.put(t.getTaskID(), taskThread);
+                taskThread.setTask(t);
+              }
+            }
 
             if (state != t.getTaskState())
             {
@@ -876,7 +986,13 @@
   {
     // If the task has finished we don't want to restart it
     TaskState state = task.getTaskState();
-    if (state != null && TaskState.isDone(state))
+
+    // Reset task state if recurring.
+    if (state == TaskState.RECURRING) {
+      state = null;
+    }
+
+    if ((state != null) && TaskState.isDone(state))
     {
       return state;
     }
@@ -1011,26 +1127,6 @@
                     String.valueOf(taskBackend.getTaskRootDN()));
             logError(message);
           }
-          else if (parentDN.equals(taskBackend.getRecurringTasksParentDN()))
-          {
-            try
-            {
-              RecurringTask recurringTask = entryToRecurringTask(entry);
-              addRecurringTask(recurringTask, false);
-            }
-            catch (DirectoryException de)
-            {
-              if (debugEnabled())
-              {
-                TRACER.debugCaught(DebugLogLevel.ERROR, de);
-              }
-
-              Message message =
-                  ERR_TASKSCHED_CANNOT_SCHEDULE_RECURRING_TASK_FROM_ENTRY.
-                    get(String.valueOf(entryDN), de.getMessageObject());
-              logError(message);
-            }
-          }
           else if (parentDN.equals(taskBackend.getScheduledTasksParentDN()))
           {
             try
@@ -1057,6 +1153,26 @@
               logError(message);
             }
           }
+          else if (parentDN.equals(taskBackend.getRecurringTasksParentDN()))
+          {
+            try
+            {
+              RecurringTask recurringTask = entryToRecurringTask(entry);
+              addRecurringTask(recurringTask, false);
+            }
+            catch (DirectoryException de)
+            {
+              if (debugEnabled())
+              {
+                TRACER.debugCaught(DebugLogLevel.ERROR, de);
+              }
+
+              Message message =
+                  ERR_TASKSCHED_CANNOT_SCHEDULE_RECURRING_TASK_FROM_ENTRY.
+                    get(String.valueOf(entryDN), de.getMessageObject());
+              logError(message);
+            }
+          }
           else
           {
             Message message = ERR_TASKSCHED_INVALID_TASK_ENTRY_DN.get(

--
Gitblit v1.10.0