Source code for seesaw.runner

'''Pipeline execution.'''
import datetime
import functools
import os.path
import sys

import seesaw.util
from seesaw.config import realize
from seesaw.event import Event
from seesaw.item import Item

from tornado import ioloop


[docs]class Runner(object): '''Executes and manages the lifetime of :class:`Pipeline` instances.''' def __init__(self, stop_file=None, concurrent_items=1, max_items=None, keep_data=False): self.pipeline = None self.concurrent_items = concurrent_items self.max_items = max_items self.keep_data = keep_data self.item_count = 0 self.active_items = set() self.stop_flag = False self.stop_file = stop_file self.initial_stop_file_mtime = self.stop_file_mtime() self.on_status = Event() self.on_create_item = Event() self.on_pipeline_start_item = Event() self.on_pipeline_finish_item = Event() self.on_finish = Event() if stop_file: ioloop.PeriodicCallback(self.check_stop_file, 5000).start()
[docs] def set_current_pipeline(self, pipeline): old_pipeline = self.pipeline if pipeline: pipeline.on_start_item += self._item_starting pipeline.on_finish_item += self._item_finished self.pipeline = pipeline if old_pipeline: # stop any cancellable items in the previous pipeline old_pipeline.cancel_items()
[docs] def is_active(self): return len(self.active_items) > 0
[docs] def start(self): self.add_items()
[docs] def stop_gracefully(self): print("Stopping when current tasks are completed...") self.stop_flag = True self.pipeline.cancel_items() self.pipeline.on_stop_requested() self.initial_stop_file_mtime = self.stop_file_mtime() self.on_status(self, "stopping")
[docs] def keep_running(self): print("Keep running...") self.stop_flag = False self.pipeline.on_stop_canceled() self.initial_stop_file_mtime = self.stop_file_mtime() self.on_status(self, "running")
[docs] def check_stop_file(self): if self.stop_file_changed(): self.stop_gracefully()
[docs] def should_stop(self): return self.stop_flag or self.stop_file_changed()
[docs] def stop_file_changed(self): current_stop_file_mtime = self.stop_file_mtime() if current_stop_file_mtime: return self.initial_stop_file_mtime is None \ or self.initial_stop_file_mtime < current_stop_file_mtime else: return False
[docs] def stop_file_mtime(self): if self.stop_file and os.path.exists(self.stop_file): return os.path.getmtime(self.stop_file) else: return None
[docs] def add_items(self): if self.pipeline: items_required = int(realize(self.concurrent_items)) while len(self.active_items) < items_required: if self.max_items and self.max_items <= self.item_count: return self.item_count += 1 item_id = "{0}-{1}".format( seesaw.util.unique_id_str(), self.item_count) item = Item( pipeline=self.pipeline, item_id=item_id, item_number=self.item_count, keep_data=self.keep_data ) self.on_create_item(self, item) self.active_items.add(item) self.pipeline.enqueue(item)
def _item_starting(self, pipeline, item): self.on_pipeline_start_item(self, pipeline, item) def _item_finished(self, pipeline, item): if item.failed: item.log_output("Waiting 10 seconds...") ioloop.IOLoop.instance().add_timeout( datetime.timedelta(seconds=10), functools.partial( self._item_finished_without_delay, pipeline, item) ) else: self._item_finished_without_delay(pipeline, item) def _item_finished_without_delay(self, pipeline, item): self.on_pipeline_finish_item(self, pipeline, item) self.active_items.remove(item) def add_more_items(): if not self.should_stop(): self.add_items() if len(self.active_items) == 0: self.on_finish(self) ioloop.IOLoop.instance().add_timeout( datetime.timedelta(), add_more_items )
[docs]class SimpleRunner(Runner): '''Executes a single class:`Pipeline` instance.''' def __init__(self, pipeline, stop_file=None, concurrent_items=1, max_items=None, keep_data=False): Runner.__init__( self, stop_file=stop_file, concurrent_items=concurrent_items, max_items=max_items, keep_data=keep_data) self.set_current_pipeline(pipeline) self.on_create_item += self._handle_create_item self.on_finish += self._stop_ioloop
[docs] def start(self): Runner.start(self) ioloop.IOLoop.instance().start() self.pipeline.on_cleanup()
def _stop_ioloop(self, dummy): ioloop.IOLoop.instance().stop()
[docs] def forced_stop(self): print("Stopping immediately...") # TODO perhaps the subprocesses should be killed ioloop.IOLoop.instance().stop()
def _handle_create_item(self, dummy, item): item.on_output += self._handle_item_output def _handle_item_output(self, item, data): try: sys.stdout.write(data) except UnicodeError: sys.stdout.write(data.encode('ascii', 'replace').decode('ascii')) sys.stdout.flush()