'''The warrior web interface.'''
import collections
import hashlib
import json
import os
import os.path
import random
import re
import time
from sockjs.tornado import SockJSConnection, SockJSRouter
from tornado import web, ioloop
from seesaw.config import realize
from seesaw.web_util import AuthenticatedApplication
PUBLIC_PATH = os.path.abspath(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "public"))
TEMPLATES_PATH = os.path.abspath(
os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates"))
[docs]class IndexHandler(web.RequestHandler):
'''Shows the index.html.'''
[docs] def get(self):
self.render(os.path.join(PUBLIC_PATH, "index.html"),
timestamp=time.time())
[docs]class ItemMonitor(object):
'''Pushes item states and information to the client.'''
def __init__(self, item):
self.pipeline = item.pipeline
self.item = item
item.on_output += self.handle_item_output
item.on_task_status += self.handle_item_task_status
item.on_property += self.handle_item_property
item.on_complete += self.handle_item_complete
item.on_fail += self.handle_item_fail
item.on_cancel += self.handle_item_cancel
self.collected_data = collections.deque((), 500)
SeesawConnection.broadcast(
"pipeline.start_item",
{"pipeline_id": id(self.pipeline),
"item": self.item_for_broadcast()}
)
[docs] def item_for_broadcast(self):
item = self.item
tasks = []
for (task, task_name) in self.pipeline.ui_task_list():
tasks.append({
"id": id(task),
"name": task_name,
"status": (item.task_status[task]
if task in item.task_status else None)
})
if self.pipeline.project:
project_name = self.pipeline.project.title
else:
project_name = None
item_data = {
"id": item.item_id,
"name": ("Item %s" % item["item_name"]
if "item_name" in item else "New item"),
"number": item.item_number,
"status": self.item_status(),
"tasks": tasks,
"output": "".join(self.collected_data),
"project": project_name,
"start_time": item.start_time
}
return item_data
[docs] def item_status(self):
if self.item.completed:
return "completed"
elif self.item.failed:
return "failed"
elif self.item.canceled:
return "canceled"
else:
return "running"
[docs] def handle_item_output(self, item, data):
self.collected_data.append(data)
SeesawConnection.broadcast(
"item.output", {"item_id": item.item_id, "data": data})
[docs] def handle_item_task_status(self, item, task, new_status, old_status):
SeesawConnection.broadcast(
"item.task_status",
{
"item_id": item.item_id,
"task_id": id(task),
"new_status": new_status,
"old_status": old_status
}
)
[docs] def handle_item_property(self, item, key, new_value, old_value):
if key == "item_name":
SeesawConnection.broadcast(
"item.update_name",
{"item_id": item.item_id, "new_name": "Item %s" % new_value})
[docs] def handle_item_complete(self, item):
SeesawConnection.broadcast("item.complete", {"item_id": item.item_id})
[docs] def handle_item_fail(self, item):
SeesawConnection.broadcast("item.fail", {"item_id": item.item_id})
[docs] def handle_item_cancel(self, item):
SeesawConnection.broadcast("item.cancel", {"item_id": item.item_id})
[docs]class ApiHandler(web.RequestHandler):
'''Processes API requests.'''
[docs] def initialize(self, warrior=None, runner=None):
self.warrior = warrior
self.runner = runner
[docs] def get_template_path(self):
return TEMPLATES_PATH
[docs] def post(self, command):
if command == "stop":
if self.warrior:
self.warrior.stop_gracefully()
else:
self.runner.stop_gracefully()
self.write("OK")
elif command == "stop_now":
if self.warrior:
self.warrior.forced_stop()
else:
self.runner.forced_stop()
self.write("OK")
elif command == "keep_running":
if self.warrior:
self.warrior.keep_running()
else:
self.runner.keep_running()
self.write("OK")
elif command == "select-project":
self.warrior.config_manager.set_value(
"selected_project", self.get_argument("project_name"))
self.warrior.select_project(self.get_argument("project_name"))
self.write("OK")
elif command == "deselect-project":
self.warrior.config_manager.set_value("selected_project", "none")
self.warrior.select_project(None)
self.write("OK")
elif command == "settings":
posted_values = {}
for (name, value) in self.request.arguments.items():
value[0] = value[0].decode('utf8', 'replace')
if not self.warrior.config_manager.set_value(name, value[0]):
posted_values[name] = value[0]
if self.warrior.config_manager.all_valid():
self.warrior.fire_status()
self.render("settings.html", warrior=self.warrior,
posted_values=posted_values)
[docs] def get(self, command):
if command == "all-projects":
self.render("all-projects.html", warrior=self.warrior,
realize=realize)
elif command == "settings":
self.render("settings.html", warrior=self.warrior,
posted_values={})
elif command == "help":
self.render("help.html", warrior=self.warrior)
[docs]class SeesawConnection(SockJSConnection):
'''A WebSocket server that communicates the state of the warrior.'''
instance_id = ("%d-%f" % (os.getpid(), random.random()))
clients = set()
item_monitors = dict()
warrior = None
project = None
runner = None
[docs] def emit(self, event_name, message):
'''tornadoio to sockjs adapter.'''
self.send(json.dumps({'event_name': event_name, 'message': message}))
[docs] def on_open(self, info):
self.clients.add(self)
self.emit("instance_id", self.instance_id)
items = []
for item_monitor in self.item_monitors.values():
items.append(item_monitor.item_for_broadcast())
if self.project:
self.emit("project.refresh", {
"project": self.project.data_for_json(),
"status": ("stopping"
if self.runner.should_stop() else "running"),
"items": items
})
else:
self.emit("project.refresh", None)
if self.warrior:
self.emit("warrior.projects_loaded", {
"projects": self.warrior.projects
})
self.emit("warrior.status",
{"status": self.warrior.warrior_status()})
self.emit("warrior.broadcast_message",
{
"message": self.warrior.broadcast_message,
"hash": hash_string(self.warrior.broadcast_message)
})
@classmethod
[docs] def broadcast_bandwidth(cls):
if cls.warrior:
bw_stats = cls.warrior.bandwidth_stats()
if bw_stats:
cls.broadcast("bandwidth", bw_stats)
@classmethod
[docs] def broadcast_timestamp(cls):
cls.broadcast("timestamp", {"timestamp": time.time()})
@classmethod
[docs] def handle_warrior_status(cls, warrior, new_status):
cls.broadcast("warrior.status", {"status": new_status})
@classmethod
[docs] def handle_projects_loaded(cls, warrior, projects):
cls.broadcast_projects()
@classmethod
[docs] def broadcast_projects(cls):
cls.broadcast("warrior.projects_loaded", {
"projects": cls.warrior.projects
})
@classmethod
[docs] def handle_project_selected(cls, warrior, project):
cls.broadcast("warrior.project_selected", {"project": project})
@classmethod
[docs] def handle_project_installing(cls, warrior, project):
cls.broadcast("warrior.project_installing", {"project": project})
@classmethod
[docs] def handle_project_installed(cls, warrior, project, output):
output = re.sub("\r\n?", "\n", output)
cls.broadcast("warrior.project_installed",
{"project": project, "output": output})
@classmethod
[docs] def handle_project_installation_failed(cls, warrior, project, output):
output = re.sub("\r\n?", "\n", output)
cls.broadcast("warrior.project_installation_failed",
{"project": project, "output": output})
@classmethod
[docs] def handle_project_refresh(cls, warrior, project, runner):
cls.project = project
cls.runner = runner
cls.broadcast_project_refresh()
@classmethod
[docs] def handle_broadcast_message(cls, warrior, message):
cls.broadcast("warrior.broadcast_message",
{
"message": message,
"hash": hash_string(message)
})
@classmethod
[docs] def broadcast_project_refresh(cls):
if cls.project:
cls.broadcast("project.refresh", {
"project": cls.project.data_for_json(),
"status": ("stopping"
if cls.runner.should_stop() else "running"),
"items": []
})
else:
cls.broadcast("project.refresh", None)
@classmethod
[docs] def handle_runner_status(cls, runner, status):
cls.broadcast("runner.status", {
"status": ("stopping" if runner.should_stop() else "running")
})
@classmethod
[docs] def handle_start_item(cls, runner, pipeline, item):
cls.item_monitors[item] = ItemMonitor(item)
@classmethod
[docs] def handle_finish_item(cls, runner, pipeline, item):
del cls.item_monitors[item]
@classmethod
[docs] def broadcast(cls, event, message):
for client in cls.clients:
if message:
message["session_id"] = client.session.session_id
client.emit(event, message)
[docs] def on_message(self, message):
pass
[docs] def on_close(self):
self.clients.remove(self)
[docs]def hash_string(text):
'''Generate a digest for broadcast message.'''
return hashlib.md5((text or '').encode('ascii', 'replace')).hexdigest()
[docs]def start_runner_server(project, runner, bind_address="localhost", port_number=8001,
http_username=None, http_password=None):
'''Starts a web interface for a manually run pipeline.
Unlike :func:`start_warrior_server`, this UI does not contain an
configuration or project management panel.
'''
# if bind_address == "0.0.0.0":
# bind_address = ""
SeesawConnection.project = project
SeesawConnection.runner = runner
runner.on_pipeline_start_item += SeesawConnection.handle_start_item
runner.on_pipeline_finish_item += SeesawConnection.handle_finish_item
runner.on_status += SeesawConnection.handle_runner_status
ioloop.PeriodicCallback(SeesawConnection.broadcast_timestamp, 1000).start()
router = SockJSRouter(SeesawConnection)
application = AuthenticatedApplication(
router.apply_routes([
(r"/(.*\.(html|css|js|swf|png|ico))$",
web.StaticFileHandler, {"path": PUBLIC_PATH}),
("/", IndexHandler),
("/api/(.+)$", ApiHandler, {"runner": runner})]),
# flash_policy_port = 843,
# flash_policy_file=os.path.join(PUBLIC_PATH, "flashpolicy.xml"),
socket_io_address=bind_address,
socket_io_port=port_number,
# settings for AuthenticatedApplication
auth_enabled=(http_password or "").strip() != "",
check_auth=lambda r, username, password:
(
password == http_password and
(http_username or "").strip() in ["", username]
),
auth_realm="ArchiveTeam Warrior",
skip_auth=[r"^/socket\.io/1/websocket/[a-z0-9]+$"]
)
application.listen(port_number, bind_address)
[docs]def start_warrior_server(warrior, bind_address="localhost", port_number=8001,
http_username=None, http_password=None):
'''Starts the warrior web interface.'''
SeesawConnection.warrior = warrior
warrior.on_projects_loaded += SeesawConnection.handle_projects_loaded
warrior.on_project_refresh += SeesawConnection.handle_project_refresh
warrior.on_project_installing += SeesawConnection.handle_project_installing
warrior.on_project_installed += SeesawConnection.handle_project_installed
warrior.on_project_installation_failed += \
SeesawConnection.handle_project_installation_failed
warrior.on_project_selected += SeesawConnection.handle_project_selected
warrior.on_broadcast_message_received += SeesawConnection.handle_broadcast_message
warrior.on_status += SeesawConnection.handle_warrior_status
warrior.runner.on_pipeline_start_item += SeesawConnection.handle_start_item
warrior.runner.on_pipeline_finish_item += \
SeesawConnection.handle_finish_item
warrior.runner.on_status += SeesawConnection.handle_runner_status
if not http_username:
http_username = warrior.http_username
if not http_password:
http_password = warrior.http_password
ioloop.PeriodicCallback(SeesawConnection.broadcast_bandwidth, 1000).start()
ioloop.PeriodicCallback(SeesawConnection.broadcast_timestamp, 1000).start()
router = SockJSRouter(SeesawConnection)
application = AuthenticatedApplication(
router.apply_routes([
(r"/(.*\.(html|css|js|swf|png|ico))$",
web.StaticFileHandler, {"path": PUBLIC_PATH}),
("/", IndexHandler),
("/api/(.+)$", ApiHandler, {"warrior": warrior})]),
# flash_policy_port = 843,
# flash_policy_file = os.path.join(PUBLIC_PATH, "flashpolicy.xml"),
socket_io_address=bind_address,
socket_io_port=port_number,
# settings for AuthenticatedApplication
auth_enabled=lambda: (realize(http_password) or "").strip() != "",
check_auth=lambda r, username, password:
(
password == realize(http_password) and
(realize(http_username) or "").strip() in ["", username]
),
auth_realm="ArchiveTeam Warrior",
skip_auth=[tornado_url[0] for tornado_url in router.urls]
)
application.listen(port_number, bind_address)