'''The warrior server.
The warrior phones home to Warrior HQ
(https://github.com/ArchiveTeam/warrior-hq).
'''
import datetime
from distutils.version import StrictVersion
import json
import os
import os.path
import re
import shutil
import subprocess
import sys
import time
import logging
from tornado import gen
from tornado import ioloop
from tornado.httpclient import AsyncHTTPClient
import seesaw
from seesaw.config import NumberConfigValue, StringConfigValue, ConfigValue
from seesaw.config import realize
from seesaw.event import Event
from seesaw.externalprocess import AsyncPopen2
from seesaw.runner import Runner
import seesaw.six
try:
from collections import OrderedDict
except ImportError:
from ordereddict import OrderedDict
if seesaw.six.PY2:
bigint = long # @UndefinedVariable pylint: disable=undefined-variable
else:
bigint = int
logger = logging.getLogger(__name__)
[docs]class ConfigManager(object):
'''Manages the configuration.'''
def __init__(self, config_file):
self.config_file = config_file
self.config_memory = {}
self.config_values = OrderedDict()
self.load()
[docs] def add(self, config_value):
self.config_values[config_value.name] = config_value
if config_value.name in self.config_memory:
remembered_value = self.config_memory[config_value.name]
if config_value.check_value(remembered_value) is None:
config_value.set_value(remembered_value)
self.save()
[docs] def remove(self, name):
if name in self.config_values:
del self.config_values[name]
self.save()
[docs] def all_valid(self):
return all([c.is_valid() for c in self])
[docs] def set_value(self, name, value):
if name in self.config_values:
if self.config_values[name].set_value(value):
self.config_memory[name] = value
self.save()
return True
return False
[docs] def load(self):
try:
with open(self.config_file) as f:
self.config_memory = json.load(f)
except Exception:
logger.exception('Error loading config.')
self.config_memory = {}
[docs] def save(self):
with open(self.config_file, "w") as f:
json.dump(self.config_memory, f)
def __iter__(self):
return iter(self.config_values.values())
[docs] def editable_values(self):
return [v for v in self if v.editable]
[docs]class BandwidthMonitor(object):
'''Extracts the bandwidth usage from the system stats.'''
devre = re.compile(r"^\s*([a-z0-9]+):(.+)$")
def __init__(self, device):
self.device = device
self.prev_time = None
self.prev_stats = None
self.bandwidth = None
self._prev_received = 0
self._prev_sent = 0
self._overflow_received = 0
self._overflow_sent = 0
self.update()
[docs] def current_stats(self):
if self.prev_stats and self.bandwidth:
return {
"received": self.prev_stats[0],
"sent": self.prev_stats[1],
"receiving": self.bandwidth[0],
"sending": self.bandwidth[1]
}
return None
[docs] def update(self):
cur_time = time.time()
cur_stats = self._get_stats()
if self.prev_stats is not None and cur_stats is not None:
time_delta = cur_time - self.prev_time
if time_delta:
self.bandwidth = [
(cur_stats[0] - self.prev_stats[0]) / time_delta,
(cur_stats[1] - self.prev_stats[1]) / time_delta,
]
self.prev_time = cur_time
self.prev_stats = cur_stats
return self.bandwidth
def _get_stats(self):
with open("/proc/net/dev") as f:
lines = f.readlines()
for line in lines:
m = self.devre.match(line)
if m and m.group(1) == self.device:
fields = m.group(2).split()
received = bigint(fields[0])
sent = bigint(fields[8])
if self._prev_received > received:
self._overflow_received += 2 ** 32
self._prev_received = received
if self._prev_sent > sent:
self._overflow_sent += 2 ** 32
self._prev_sent = sent
return [received + self._overflow_received,
sent + self._overflow_sent ]
return None
[docs]class Warrior(object):
'''The warrior god object.'''
def __init__(self, projects_dir, data_dir, warrior_hq_url,
real_shutdown=False, keep_data=False):
if not os.access(projects_dir, os.W_OK):
raise Exception(
"Couldn't write to projects directory: %s" % projects_dir)
if not os.access(data_dir, os.W_OK):
raise Exception("Couldn't write to data directory: %s" % data_dir)
self.projects_dir = projects_dir
self.data_dir = data_dir
self.warrior_hq_url = warrior_hq_url
self.real_shutdown = real_shutdown
self.keep_data = keep_data
# disable the password prompts
self.gitenv = dict(
list(os.environ.items()) +
list({
'GIT_ASKPASS': 'echo',
'SSH_ASKPASS': 'echo'
}.items())
)
self.warrior_id = StringConfigValue(
name="warrior_id",
title="Warrior ID",
description="The unique number of your warrior instance.",
editable=False
)
self.selected_project_config_value = StringConfigValue(
name="selected_project",
title="Selected project",
description="The project (to be continued when the warrior "
"restarts).",
default="none",
editable=False
)
self.downloader = StringConfigValue(
name="downloader",
title="Your nickname",
description="We use your nickname to show your results on our "
"tracker. Letters and numbers only.",
regex="^[-_a-zA-Z0-9]{3,30}$",
advanced=False
)
self.concurrent_items = NumberConfigValue(
name="concurrent_items",
title="Concurrent items",
description="How many items should the warrior download at a "
"time? (Max: 6)",
min=1,
max=6,
default=2
)
self.http_username = StringConfigValue(
name="http_username",
title="HTTP username",
description="Enter a username to protect the web interface, "
"or leave empty.",
default=""
)
self.http_password = StringConfigValue(
name="http_password",
title="HTTP password",
description="Enter a password to protect the web interface, "
"or leave empty.",
default=""
)
self.config_manager = ConfigManager(os.path.join(projects_dir,
"config.json"))
self.config_manager.add(self.warrior_id)
self.config_manager.add(self.selected_project_config_value)
self.config_manager.add(self.downloader)
self.config_manager.add(self.concurrent_items)
self.config_manager.add(self.http_username)
self.config_manager.add(self.http_password)
self.bandwidth_monitor = BandwidthMonitor("eth0")
self.bandwidth_monitor.update()
self.runner = Runner(concurrent_items=self.concurrent_items,
keep_data=self.keep_data)
self.runner.on_finish += self.handle_runner_finish
self.current_project_name = None
self.current_project = None
self.selected_project = None
self.projects = {}
self.installed_projects = set()
self.failed_projects = set()
self.on_projects_loaded = Event()
self.on_project_installing = Event()
self.on_project_installed = Event()
self.on_project_installation_failed = Event()
self.on_project_refresh = Event()
self.on_project_selected = Event()
self.on_status = Event()
self.on_broadcast_message_received = Event()
self.http_client = AsyncHTTPClient()
self.installing = False
self.shut_down_flag = False
self.reboot_flag = False
io_loop = ioloop.IOLoop.instance()
def update_warror_callback():
io_loop.add_future(
self.update_warrior_hq(), lambda fut: fut.result()
)
def update_project_callback():
io_loop.add_future(self.update_project(), lambda fut: fut.result())
self.hq_updater = ioloop.PeriodicCallback(update_warror_callback,
10 * 60 * 1000)
self.project_updater = ioloop.PeriodicCallback(update_project_callback,
30 * 60 * 1000)
self.forced_reboot_timeout = None
self.lat_lng = None
self.find_lat_lng()
self.install_output = None
self.broadcast_message = None
self.contacting_hq_failed = False
[docs] def find_lat_lng(self):
# response = self.http_client.fetch("http://www.maxmind.com/app/mylocation", self.handle_lat_lng, user_agent="")
pass
[docs] def handle_lat_lng(self, response):
m = re.search(r"geoip-demo-results-tbodyLatitude/Longitude</td>"
r"\s*<td[^>]*>\s*([-/.0-9]+)\s*</td>",
response.body)
if m:
self.lat_lng = m.group(1)
[docs] def bandwidth_stats(self):
self.bandwidth_monitor.update()
return self.bandwidth_monitor.current_stats()
@gen.coroutine
[docs] def update_warrior_hq(self):
logger.debug('Update warrior hq.')
if realize(self.warrior_id) is None:
headers = {"Content-Type": "application/json"}
user_agent = "ArchiveTeam Warrior/%s" % seesaw.__version__
body = json.dumps(
{"warrior": {"version": seesaw.__version__}}
)
response = yield self.http_client.fetch(
os.path.join(self.warrior_hq_url,
"api/register.json"),
method="POST",
headers=headers,
user_agent=user_agent,
body=body
)
if response.code == 200:
data = json.loads(response.body.decode('utf-8'))
logger.info("Received Warrior ID '%s'." % data["warrior_id"])
self.config_manager.set_value("warrior_id", data["warrior_id"])
self.fire_status()
else:
logger.error("HTTP error %s" % (response.code))
self.fire_status()
return
else:
logger.debug("Warrior ID '%s'." % realize(self.warrior_id))
headers = {"Content-Type": "application/json"}
user_agent = "ArchiveTeam Warrior/%s %s" % (seesaw.__version__,
seesaw.runner_type)
body = json.dumps({
"warrior": {
"warrior_id": realize(self.warrior_id),
"lat_lng": self.lat_lng,
"downloader": realize(self.downloader),
"selected_project": realize(self.selected_project_config_value)
}})
response = yield self.http_client.fetch(
os.path.join(self.warrior_hq_url,
"api/update.json"),
method="POST",
headers=headers,
user_agent=user_agent,
body=body
)
if response.code == 200:
data = json.loads(response.body.decode('utf-8'))
if StrictVersion(seesaw.__version__) < \
StrictVersion(data["warrior"]["seesaw_version"]):
# time for an update
logger.info("Reboot for Seesaw update.")
self.reboot_gracefully()
# schedule a forced reboot after two days
self.schedule_forced_reboot()
return
projects_list = data["projects"]
self.projects = OrderedDict(
[(project["name"], project) for project in projects_list])
for project_data in self.projects.values():
if "deadline" in project_data:
project_data["deadline_int"] = time.mktime(
time.strptime(project_data["deadline"],
"%Y-%m-%dT%H:%M:%SZ"))
previous_project_choice = realize(
self.selected_project_config_value)
if self.selected_project and \
self.selected_project not in self.projects:
yield self.select_project(None)
elif previous_project_choice in self.projects:
# select previous project
yield self.select_project(previous_project_choice)
elif previous_project_choice == "auto":
# ArchiveTeam's choice
if "auto_project" in data:
yield self.select_project(data["auto_project"])
else:
yield self.select_project(None)
self.contacting_hq_failed = False
self.on_projects_loaded(self, self.projects)
self.broadcast_message = data.get('broadcast_message')
self.on_broadcast_message_received(
self, data.get('broadcast_message'))
else:
logger.error("HTTP error %s" % (response.code))
self.contacting_hq_failed = True
# We don't set projects to {} because it causes the
# "Stop Current" project button to disappear
for name in tuple(self.projects):
if name != self.selected_project:
del self.projects[name]
self.on_projects_loaded(self, self.projects)
@gen.coroutine
[docs] def install_project(self, project_name):
logger.debug('Install project %s', project_name)
self.installed_projects.discard(project_name)
if project_name in self.projects and not self.installing:
self.installing = project_name
self.install_output = []
project = self.projects[project_name]
project_path = os.path.join(self.projects_dir, project_name)
self.on_project_installing(self, project)
if project_name in self.failed_projects:
if os.path.exists(project_path):
shutil.rmtree(project_path)
self.failed_projects.discard(project_name)
if os.path.exists(project_path):
subprocess.Popen(
args=["git", "config", "remote.origin.url",
project["repository"]],
cwd=project_path
).communicate()
logger.debug('git pull from %s', project["repository"])
p = AsyncPopen2(
args=["git", "pull"],
cwd=project_path,
env=self.gitenv
)
else:
logger.debug('git clone')
p = AsyncPopen2(
args=["git", "clone", project["repository"], project_path],
env=self.gitenv
)
p.on_output += self.collect_install_output
p.on_end += yield gen.Callback("gitend")
p.run()
result = yield gen.Wait("gitend")
if result != 0:
self.install_output.append("\ngit returned %d\n" % result)
logger.error(
"Project failed to install: %s",
"".join(self.install_output)
)
self.on_project_installation_failed(
self, project, "".join(self.install_output))
self.installing = None
self.failed_projects.add(project_name)
raise gen.Return(False)
else:
logger.debug(
"git operation: %s", "".join(self.install_output)
)
project_install_file = os.path.join(project_path,
"warrior-install.sh")
if os.path.exists(project_install_file):
p = AsyncPopen2(
args=[project_install_file],
cwd=project_path
)
p.on_output += self.collect_install_output
p.on_end += yield gen.Callback("installend")
p.run()
result = yield gen.Wait("installend")
if result != 0:
self.install_output.append(
"\nCustom installer returned %d\n" % result)
logger.error(
"Custom installer failed to install: %s",
"".join(self.install_output)
)
self.on_project_installation_failed(
self, project, "".join(self.install_output))
self.installing = None
self.failed_projects.add(project_name)
raise gen.Return(False)
data_dir = os.path.join(self.data_dir, "data")
if os.path.exists(data_dir):
shutil.rmtree(data_dir)
os.makedirs(data_dir)
project_data_dir = os.path.join(project_path, "data")
if os.path.islink(project_data_dir):
os.remove(project_data_dir)
elif os.path.isdir(project_data_dir):
shutil.rmtree(project_data_dir)
os.symlink(data_dir, project_data_dir)
self.installed_projects.add(project_name)
logger.debug('Install complete %s', "".join(self.install_output))
self.on_project_installed(self, project,
"".join(self.install_output))
self.installing = None
raise gen.Return(True)
@gen.coroutine
[docs] def update_project(self):
logger.debug('Update project.')
if self.selected_project and \
(yield self.check_project_has_update(self.selected_project)):
# restart project
yield self.start_selected_project(reinstall=True)
@gen.coroutine
[docs] def check_project_has_update(self, project_name):
logger.debug('Check project has update %s', project_name)
if project_name in self.projects:
project = self.projects[project_name]
project_path = os.path.join(self.projects_dir, project_name)
self.install_output = []
if not os.path.exists(project_path):
logger.debug("Project doesn't exist.")
raise gen.Return(True)
subprocess.Popen(
args=["git", "config", "remote.origin.url",
project["repository"]],
cwd=project_path
).communicate()
logger.debug('git fetch')
p = AsyncPopen2(
args=["git", "fetch"],
cwd=project_path,
env=self.gitenv
)
p.on_output += self.collect_install_output
p.on_end += yield gen.Callback("gitend")
p.run()
result = yield gen.Wait("gitend")
if result != 0:
logger.debug('Got return code %s', result)
raise gen.Return(True)
output = subprocess.Popen(
args=["git", "rev-list", "HEAD..origin/HEAD"],
cwd=project_path,
stdout=subprocess.PIPE
).communicate()[0]
if output.strip():
logger.debug('True')
raise gen.Return(True)
else:
logger.debug('False')
raise gen.Return(False)
[docs] def collect_install_output(self, data):
if isinstance(data, seesaw.six.binary_type):
text = data.decode('ascii', 'replace')
else:
text = data
sys.stdout.write(text)
text = re.sub("[\x00-\x08\x0b\x0c]", "", text)
self.install_output.append(text)
@gen.coroutine
[docs] def select_project(self, project_name):
logger.debug('Select project %s', project_name)
if project_name == "auto":
yield self.update_warrior_hq()
return
if project_name not in self.projects:
logger.debug("Project doesn't exist.")
project_name = None
if project_name != self.selected_project:
# restart
self.selected_project = project_name
self.on_project_selected(self, project_name)
yield self.start_selected_project()
[docs] def clone_project(self, project_name, project_path):
logger.debug('Clone project %s %s', project_name, project_path)
version_string = subprocess.Popen(
args=["git", "log", "-1", "--pretty=%h"],
cwd=project_path,
stdout=subprocess.PIPE
).communicate()[0].strip().decode('ascii')
logger.debug('Cloning version %s', version_string)
project_versioned_path = os.path.join(
self.data_dir, "projects",
"%s-%s" % (project_name, version_string))
if not os.path.exists(project_versioned_path):
if not os.path.exists(os.path.join(self.data_dir, "projects")):
os.makedirs(os.path.join(self.data_dir, "projects"))
subprocess.Popen(
args=["git", "clone", project_path, project_versioned_path],
env=self.gitenv
).communicate()
return project_versioned_path
[docs] def load_pipeline(self, pipeline_path, context):
logger.debug('Load pipeline %s', pipeline_path)
dirname, basename = os.path.split(pipeline_path)
if dirname == "":
dirname = "."
with open(pipeline_path) as f:
pipeline_str = f.read()
ConfigValue.start_collecting()
local_context = context
global_context = context
curdir = os.getcwd()
try:
os.chdir(dirname)
exec(pipeline_str, local_context, global_context)
finally:
os.chdir(curdir)
config_values = ConfigValue.stop_collecting()
project = local_context["project"]
pipeline = local_context["pipeline"]
pipeline.project = project
return (project, pipeline, config_values)
@gen.coroutine
[docs] def start_selected_project(self, reinstall=False):
logger.debug(
'Start selected project %s (reinstall=%s)',
self.selected_project, reinstall
)
project_name = self.selected_project
if project_name in self.projects:
# install or update project if necessary
if project_name not in self.installed_projects or \
reinstall or \
(yield self.check_project_has_update(project_name)):
result = yield self.install_project(project_name)
if not result:
logger.warning(
"Project %s did not install correctly and "
"we're ignoring this problem.",
project_name
)
return
# remove the configuration variables from the previous project
if self.current_project:
for config_value in self.current_project.config_values:
self.config_manager.remove(config_value.name)
# the path with the project code
# (this is the most recent code from the repository)
project_path = os.path.join(self.projects_dir, project_name)
# clone the project code to a versioned directory
# where the pipeline is actually run
project_versioned_path = self.clone_project(project_name,
project_path)
# load the pipeline from the versioned directory
pipeline_path = os.path.join(project_versioned_path, "pipeline.py")
(project, pipeline, config_values) = self.load_pipeline(
pipeline_path, {"downloader": self.downloader})
# add the configuration values to the config manager
for config_value in config_values:
self.config_manager.add(config_value)
project.config_values = config_values
# start the pipeline
if not self.shut_down_flag and not self.reboot_flag:
self.runner.set_current_pipeline(pipeline)
self.current_project_name = project_name
self.current_project = project
self.on_project_refresh(self, self.current_project, self.runner)
self.fire_status()
if not self.shut_down_flag and not self.reboot_flag:
self.runner.start()
else:
# project_name not in self.projects,
# stop the current project (if there is one)
logger.debug("Project does not exist.")
self.runner.set_current_pipeline(None)
self.fire_status()
[docs] def handle_runner_finish(self, runner):
logger.info("Runner has finished.")
if self.current_project:
for config_value in self.current_project.config_values:
self.config_manager.remove(config_value.name)
self.current_project_name = None
self.current_project = None
self.on_project_refresh(self, self.current_project, self.runner)
self.fire_status()
if self.shut_down_flag or self.reboot_flag:
ioloop.IOLoop.instance().stop()
if self.real_shutdown:
if self.shut_down_flag:
os.system("sudo shutdown -h now")
elif self.reboot_flag:
os.system("sudo shutdown -r now")
[docs] def start(self):
io_loop = ioloop.IOLoop.instance()
if self.real_shutdown:
# schedule a reboot
io_loop.add_timeout(datetime.timedelta(days=7),
self.max_age_reached)
self.hq_updater.start()
self.project_updater.start()
io_loop.add_future(self.update_warrior_hq(), lambda fut: fut.result())
io_loop.start()
[docs] def max_age_reached(self):
if self.real_shutdown:
# time for an sanity reboot
logger.info("Running for more than 7 days. Time to schedule a reboot.")
self.reboot_gracefully()
# schedule a forced reboot after two days
self.schedule_forced_reboot()
[docs] def reboot_gracefully(self):
self.shut_down_flag = False
self.reboot_flag = True
self.fire_status()
if self.runner.is_active():
self.runner.set_current_pipeline(None)
else:
ioloop.IOLoop.instance().stop()
if self.real_shutdown:
os.system("sudo shutdown -r now")
[docs] def schedule_forced_reboot(self):
if self.real_shutdown and not self.forced_reboot_timeout:
self.forced_reboot_timeout = ioloop.IOLoop.instance().add_timeout(
datetime.timedelta(days=2), self.forced_reboot)
[docs] def forced_reboot(self):
logger.info("Stopping immediately...")
if self.real_shutdown:
os.system("sudo shutdown -r now")
[docs] def stop_gracefully(self):
self.shut_down_flag = True
self.reboot_flag = False
self.fire_status()
if self.runner.is_active():
self.runner.set_current_pipeline(None)
else:
ioloop.IOLoop.instance().stop()
if self.real_shutdown:
os.system("sudo shutdown -h now")
[docs] def forced_stop(self):
ioloop.IOLoop.instance().stop()
if self.real_shutdown:
os.system("sudo shutdown -h now")
[docs] def keep_running(self):
self.shut_down_flag = False
self.reboot_flag = False
ioloop.IOLoop.instance().add_future(
self.start_selected_project(), lambda fut: fut.result()
)
self.fire_status()
[docs] class Status(object):
UNINITIALIZED = 'UNINITIALIZED'
NO_PROJECT = "NO_PROJECT"
INVALID_SETTINGS = "INVALID_SETTINGS"
STOPPING_PROJECT = "STOPPING_PROJECT"
RESTARTING_PROJECT = "RESTARTING_PROJECT"
RUNNING_PROJECT = "RUNNING_PROJECT"
SWITCHING_PROJECT = "SWITCHING_PROJECT"
STARTING_PROJECT = "STARTING_PROJECT"
SHUTTING_DOWN = "SHUTTING_DOWN"
REBOOTING = "REBOOTING"
[docs] def fire_status(self):
self.on_status(self, self.warrior_status())
[docs] def warrior_status(self):
if self.shut_down_flag:
return Warrior.Status.SHUTTING_DOWN
elif self.reboot_flag:
return Warrior.Status.REBOOTING
elif realize(self.warrior_id) is None:
return Warrior.Status.UNINITIALIZED
elif not self.config_manager.all_valid():
return Warrior.Status.INVALID_SETTINGS
elif self.selected_project is None and \
self.current_project_name is None:
return Warrior.Status.NO_PROJECT
elif self.selected_project:
if self.selected_project == self.current_project_name:
return Warrior.Status.RUNNING_PROJECT
else:
return Warrior.Status.STARTING_PROJECT
else:
return Warrior.Status.STOPPING_PROJECT