Source code for seesaw.pipeline

import contextlib
import os
import traceback

import tornado.stack_context

from seesaw.event import Event


[docs]class Pipeline(object): '''The sequence of steps that complete a :class:`Task`. Your pipeline will probably be something like this: 1. Request an assignment from the tracker. 2. Run Wget to download the file. 3. Upload the downloaded file with rsync. 4. Tell the tracker that the assignment is done. ''' def __init__(self, *tasks): self.cwd = os.getcwd() self.data_dir = os.path.join(self.cwd, "data") self.on_start_item = Event() self.on_complete_item = Event() self.on_fail_item = Event() self.on_cancel_item = Event() self.on_finish_item = Event() self.on_cleanup = Event() self.on_stop_requested = Event() self.on_stop_canceled = Event() self.project = None self.items_in_pipeline = set() self.tasks = [] for task in tasks: self.add_task(task)
[docs] def add_task(self, task): task.on_complete_item += self._task_complete_item task.on_fail_item += self._task_fail_item self.tasks.append(task)
[docs] def enqueue(self, item): self.items_in_pipeline.add(item) self.on_start_item(self, item) self._enqueue_with_except(self.tasks[0], item)
def _enqueue_with_except(self, task, item): @contextlib.contextmanager def handle_item_exception(e_type, e_value, tb): item.log_output("Failed %s for %s\n" % (task, item.description())) item.log_output( "".join(traceback.format_exception(e_type, e_value, tb)) ) item.log_error(self, e_value) task.fail_item(item) with tornado.stack_context.NullContext(): with tornado.stack_context.ExceptionStackContext( handle_item_exception): task.enqueue(item) def _task_complete_item(self, task, item): task_index = self.tasks.index(task) if len(self.tasks) <= task_index + 1: self._complete_item(item) else: self._enqueue_with_except(self.tasks[task_index + 1], item) def _task_fail_item(self, task, item): self._fail_item(item) def _cancel_item(self, item): if item in self.items_in_pipeline: item.cancel() self.items_in_pipeline.remove(item) self.on_cancel_item(self, item) self.on_finish_item(self, item) else: # XXX: Reaching here indicates a programming problem. # Refactoring is required. item.log_output( 'Warning: Ignoring extra cancel event.\n' + ''.join(traceback.format_stack())) def _complete_item(self, item): if item in self.items_in_pipeline: item.complete() self.items_in_pipeline.remove(item) self.on_complete_item(self, item) self.on_finish_item(self, item) else: # See comment above. item.log_output( 'Warning: Ignoring extra complete event.\n' + ''.join(traceback.format_stack())) def _fail_item(self, item): if item in self.items_in_pipeline: item.fail() self.items_in_pipeline.remove(item) self.on_fail_item(self, item) self.on_finish_item(self, item) else: # See comment above. item.log_output( 'Warning: Ignoring extra fail event.\n' + ''.join(traceback.format_stack()))
[docs] def cancel_items(self): cancel_items = [item for item in self.items_in_pipeline if item.may_be_canceled] for item in cancel_items: self._cancel_item(item)
[docs] def ui_task_list(self): task_list = [] for task in self.tasks: task.fill_ui_task_list(task_list) return task_list
def __str__(self): return "Pipeline:\n -> " + ("\n -> ".join(map(str, self.tasks)))