Source code for seesaw.task

'''Managing steps in a work unit.'''
import contextlib
import os
import traceback

from seesaw.event import Event
from seesaw.item import Item
from seesaw.config import realize


[docs]class Task(object): '''A step in the download process of an :class:`Item`. ''' def __init__(self, name): self.name = name self.cwd = os.getcwd() self.on_start_item = Event() self.on_complete_item = Event() self.on_fail_item = Event() self.on_finish_item = Event()
[docs] def start_item(self, item): item.set_task_status(self, Item.TaskStatus.running) self.on_start_item(self, item)
[docs] def fail_item(self, item): item.set_task_status(self, Item.TaskStatus.failed) self.on_fail_item(self, item) self.on_finish_item(self, item)
[docs] def complete_item(self, item): item.set_task_status(self, Item.TaskStatus.completed) self.on_complete_item(self, item) self.on_finish_item(self, item)
@contextlib.contextmanager
[docs] def task_cwd(self): curdir = os.getcwd() try: os.chdir(self.cwd) yield finally: os.chdir(curdir)
[docs] def fill_ui_task_list(self, task_list): task_list.append((self, self.name))
def __str__(self): return self.name
[docs]class SimpleTask(Task): '''A subclassable :class:`Task` that should do one small thing well. Example:: class MyTask(SimpleTask): def process(self, item): item['my_message'] = 'hello world!' ''' def __init__(self, name): Task.__init__(self, name)
[docs] def enqueue(self, item): self.start_item(item) item.log_output("Starting %s for %s\n" % (self, item.description())) try: with self.task_cwd(): self.process(item) except Exception as e: item.log_output("Failed %s for %s\n" % (self, item.description())) item.log_output("%s\n" % traceback.format_exc()) item.log_error(self, e) self.fail_item(item) else: item.log_output("Finished %s for %s\n" % (self, item.description())) self.complete_item(item)
[docs] def process(self, item): # TODO: should this raise NotImplemented or be decorated with # abc.abstractmethod? pass
def __str__(self): return self.name
[docs]class LimitConcurrent(Task): '''Restricts the number of tasks of the same type that can be run at once. ''' def __init__(self, concurrency, inner_task): Task.__init__(self, "LimitConcurrent") self.concurrency = concurrency self.inner_task = inner_task self.inner_task.on_complete_item += self._inner_task_complete_item self.inner_task.on_fail_item += self._inner_task_fail_item self._queue = [] self._working = 0
[docs] def enqueue(self, item): if self._working < realize(self.concurrency, item): self._working += 1 self.inner_task.enqueue(item) else: self._queue.append(item)
def _inner_task_complete_item(self, task, item): self._working -= 1 if len(self._queue) > 0: self._working += 1 self.inner_task.enqueue(self._queue.pop(0)) self.complete_item(item) def _inner_task_fail_item(self, task, item): self._working -= 1 if len(self._queue) > 0: self._working += 1 self.inner_task.enqueue(self._queue.pop(0)) self.fail_item(item)
[docs] def fill_ui_task_list(self, task_list): self.inner_task.fill_ui_task_list(task_list)
def __str__(self): return "LimitConcurrent({0} x {1} )".format( self.concurrency, self.inner_task)
[docs]class ConditionalTask(Task): '''Runs a task optionally.''' def __init__(self, condition_function, inner_task): Task.__init__(self, "Conditional") self.condition_function = condition_function self.inner_task = inner_task self.inner_task.on_complete_item += self._inner_task_complete_item self.inner_task.on_fail_item += self._inner_task_fail_item
[docs] def enqueue(self, item): if self.condition_function(item): self.inner_task.enqueue(item) else: item.log_output("Skipping tasks for this item.") self.complete_item(item)
def _inner_task_complete_item(self, task, item): self.complete_item(item) def _inner_task_fail_item(self, task, item): self.fail_item(item)
[docs] def fill_ui_task_list(self, task_list): self.inner_task.fill_ui_task_list(task_list)
def __str__(self): return "Conditional(" + str(self.inner_task) + ")"
[docs]class SetItemKey(SimpleTask): '''Set a value onto a task.''' def __init__(self, key, value): SimpleTask.__init__(self, "SetItemKey") self.key = key self.value = value
[docs] def process(self, item): item[self.key] = realize(self.value, self)
def __str__(self): return "SetItemKey(" + str(self.key) + ": " + str(self.value) + ")"
[docs]class PrintItem(SimpleTask): '''Output the name of the :class:`Item`.''' def __init__(self): SimpleTask.__init__(self, "PrintItem")
[docs] def process(self, item): item.log_output("%s\n" % str(item))