Source code for seesaw.externalprocess

'''Running subprocesses asynchronously.'''
from __future__ import print_function

import fcntl
import os
import os.path
import subprocess
import functools
import datetime
import pty
import signal
import atexit

import tornado.ioloop
from tornado.ioloop import IOLoop, PeriodicCallback
import tornado.process

from seesaw.event import Event
from seesaw.task import Task
from seesaw.config import realize
import time


_all_procs = set()


@atexit.register
[docs]def cleanup(): if _all_procs: print('Subprocess did not exit properly!') for proc in _all_procs: print('Killing', proc) try: if hasattr(proc, 'proc'): proc.proc.terminate() else: proc.terminate() except Exception as error: print(error) time.sleep(0.1) try: if hasattr(proc, 'proc'): proc.proc.kill() else: proc.kill() except Exception as error: print(error)
[docs]class AsyncPopen(object): '''Asynchronous version of :class:`subprocess.Popen`. Deprecated. ''' def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs self.ioloop = None self.master_fd = None self.master = None self.pipe = None self.stdin = None self.on_output = Event() self.on_end = Event() @classmethod
[docs] def ignore_sigint(cls): # http://stackoverflow.com/q/5045771/1524507 signal.signal(signal.SIGINT, signal.SIG_IGN) os.setpgrp()
[docs] def run(self): self.ioloop = IOLoop.instance() (master_fd, slave_fd) = pty.openpty() # make stdout, stderr non-blocking fcntl.fcntl(master_fd, fcntl.F_SETFL, fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) self.master_fd = master_fd self.master = os.fdopen(master_fd) # listen to stdout, stderr self.ioloop.add_handler(master_fd, self._handle_subprocess_stdout, self.ioloop.READ) slave = os.fdopen(slave_fd) self.kwargs["stdout"] = slave self.kwargs["stderr"] = slave self.kwargs["close_fds"] = True self.kwargs["preexec_fn"] = self.ignore_sigint self.pipe = subprocess.Popen(*self.args, **self.kwargs) self.stdin = self.pipe.stdin # check for process exit self.wait_callback = PeriodicCallback(self._wait_for_end, 250) self.wait_callback.start() _all_procs.add(self.pipe)
def _handle_subprocess_stdout(self, fd, events): if not self.master.closed and (events & IOLoop._EPOLLIN) != 0: data = self.master.read() self.on_output(data) self._wait_for_end(events) def _wait_for_end(self, events=0): self.pipe.poll() if self.pipe.returncode is not None or \ (events & tornado.ioloop.IOLoop._EPOLLHUP) > 0: self.wait_callback.stop() self.master.close() self.ioloop.remove_handler(self.master_fd) self.on_end(self.pipe.returncode) _all_procs.remove(self.pipe)
[docs]class AsyncPopen2(object): '''Adapter for the legacy AsyncPopen''' def __init__(self, *args, **kwargs): self.args = args self.kwargs = kwargs self.on_output = Event() self.on_end = Event() self.pipe = None
[docs] def run(self): self.kwargs["stdout"] = tornado.process.Subprocess.STREAM self.kwargs["stderr"] = tornado.process.Subprocess.STREAM self.kwargs["preexec_fn"] = AsyncPopen.ignore_sigint self.pipe = tornado.process.Subprocess(*self.args, **self.kwargs) self.pipe.stdout.read_until_close( callback=self._handle_subprocess_stdout, streaming_callback=self._handle_subprocess_stdout) self.pipe.stderr.read_until_close( callback=self._handle_subprocess_stdout, streaming_callback=self._handle_subprocess_stdout) self.pipe.set_exit_callback(self._end_callback) _all_procs.add(self.pipe)
def _handle_subprocess_stdout(self, data): self.on_output(data) def _end_callback(self, return_code): self.on_end(return_code) _all_procs.remove(self.pipe) @property def stdin(self): return self.pipe.stdin
[docs]class ExternalProcess(Task): '''External subprocess runner.''' def __init__(self, name, args, max_tries=1, retry_delay=30, accept_on_exit_code=None, retry_on_exit_code=None, env=None): Task.__init__(self, name) self.args = args self.max_tries = max_tries self.retry_delay = retry_delay if accept_on_exit_code is not None: self.accept_on_exit_code = accept_on_exit_code else: self.accept_on_exit_code = [0] self.retry_on_exit_code = retry_on_exit_code self.env = env or {} if 'PYTHONIOENCODING' not in self.env: self.env['PYTHONIOENCODING'] = 'utf8:replace'
[docs] def enqueue(self, item): self.start_item(item) item.log_output("Starting %s for %s\n" % (self, item.description())) item["tries"] = 0 item["ExternalProcess.stdin_write_error"] = False self.process(item)
[docs] def stdin_data(self, item): return b""
[docs] def process(self, item): with self.task_cwd(): p = AsyncPopen2( args=realize(self.args, item), env=realize(self.env, item), stdin=subprocess.PIPE, close_fds=True ) p.on_output += functools.partial(self.on_subprocess_stdout, p, item) p.on_end += functools.partial(self.on_subprocess_end, item) p.run() try: p.stdin.write(self.stdin_data(item)) except Exception as error: # FIXME: We need to properly propagate errors item.log_output("Error writing to process: %s" % str(error)) item["ExternalProcess.stdin_write_error"] = True p.stdin.close()
[docs] def on_subprocess_stdout(self, pipe, item, data): item.log_output(data, full_line=False)
[docs] def on_subprocess_end(self, item, returncode): if returncode in self.accept_on_exit_code and \ not item["ExternalProcess.stdin_write_error"]: self.handle_process_result(returncode, item) else: self.handle_process_error(returncode, item)
[docs] def handle_process_result(self, exit_code, item): item.log_output("Finished %s for %s\n" % (self, item.description())) self.complete_item(item)
[docs] def handle_process_error(self, exit_code, item): item["tries"] += 1 item.log_output( "Process %s returned exit code %d for %s\n" % (self, exit_code, item.description()) ) item.log_error(self, exit_code) retry_acceptable = self.max_tries is None or \ item["tries"] < self.max_tries exit_status_indicates_retry = self.retry_on_exit_code is None or \ exit_code in self.retry_on_exit_code or \ item["ExternalProcess.stdin_write_error"] if retry_acceptable and exit_status_indicates_retry: item.log_output( "Retrying %s for %s after %d seconds...\n" % (self, item.description(), self.retry_delay) ) IOLoop.instance().add_timeout( datetime.timedelta(seconds=self.retry_delay), functools.partial(self.process, item) ) else: item.log_output("Failed %s for %s\n" % (self, item.description())) self.fail_item(item)
[docs]class WgetDownload(ExternalProcess): '''Download with Wget process runner.''' def __init__(self, args, max_tries=1, accept_on_exit_code=None, retry_on_exit_code=None, env=None, stdin_data_function=None): ExternalProcess.__init__( self, "WgetDownload", args=args, max_tries=max_tries, accept_on_exit_code=(accept_on_exit_code if accept_on_exit_code is not None else [0]), retry_on_exit_code=retry_on_exit_code, env=env) self.stdin_data_function = stdin_data_function
[docs] def stdin_data(self, item): if self.stdin_data_function: return self.stdin_data_function(item) else: return b""
[docs]class RsyncUpload(ExternalProcess): '''Upload with Rsync process runner.''' def __init__(self, target, files, target_source_path="./", bwlimit="0", max_tries=None, extra_args=None): args = [ "rsync", "-rltvz", "--compress-level=9", "--timeout=300", "--contimeout=300", "--progress", "--bwlimit", bwlimit ] if extra_args is not None: args.extend(extra_args) args.extend([ "--files-from=-", target_source_path, target ]) ExternalProcess.__init__(self, "RsyncUpload", args=args, max_tries=max_tries) self.files = files self.target_source_path = target_source_path
[docs] def stdin_data(self, item): return "".join( [ "%s\n" % os.path.relpath( realize(f, item), realize(self.target_source_path, item) ) for f in realize(self.files, item) ]).encode('utf-8')
[docs]class CurlUpload(ExternalProcess): '''Upload with Curl process runner.''' def __init__(self, target, filename, connect_timeout="60", speed_limit="1", speed_time="900", max_tries=None): args = [ "curl", "--fail", "--output", "/dev/null", "--connect-timeout", str(connect_timeout), "--speed-limit", str(speed_limit), # minimum upload speed 1B/s # stop if speed < speed-limit for 900 seconds "--speed-time", str(speed_time), "--header", "X-Curl-Limits: inf,%s,%s" % (str(speed_limit), str(speed_time)), "--write-out", "Upload server: %{url_effective}\\n", "--location", "--upload-file", filename, target ] ExternalProcess.__init__(self, "CurlUpload", args=args, max_tries=max_tries)