oshipka/oshipka/webapp/tasks_routes.py

157 lines
4.1 KiB
Python

import json
import os
import subprocess
import threading
from flask import redirect, request, url_for, jsonify, render_template
from flask_socketio import emit
from flask_table import Table, LinkCol, Col
from oshipka.persistance import db
from oshipka.webapp import test_bp, socketio
from oshipka.worker import worker_app
from config import TASKS_BUF_DIR
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.Unicode)
uuid = db.Column(db.Unicode)
kwargs = db.Column(db.Unicode)
def serialize(self):
return dict(
name=self.name, uuid=self.uuid, kwargs=json.loads(self.kwargs),
)
class TasksTable(Table):
uuid = LinkCol('Task', "test_bp.get_task_status", url_kwargs=dict(task_uuid='uuid'))
name = Col('name')
def worker_start_task(task_name, task_kwargs):
async_task = worker_app.send_task(task_name, [], task_kwargs)
task = Task(name=task_name,
uuid=async_task.id,
kwargs=json.dumps(task_kwargs),
)
db.session.add(task)
db.session.commit()
return async_task.id
def get_task_ctx(task_uuid):
ctx = {}
async_task = worker_app.AsyncResult(id=task_uuid)
ctx['async_task'] = {
'result': async_task.result,
'status': async_task.status,
}
task = Task.query.filter_by(uuid=async_task.id).first()
ctx['task'] = task.serialize()
out_filename = os.path.join(TASKS_BUF_DIR, task_uuid)
if os.path.exists(out_filename):
with open(out_filename) as f:
ctx['async_task']['stdout'] = f.read()
return ctx
@test_bp.route("/tasks", methods=["GET"])
def list_tasks():
tasks = Task.query.all()
tasks_table = TasksTable(tasks)
return render_template("test/tasks.html", tasks_table=tasks_table)
@test_bp.route('/tasks/<task_name>/start', methods=['GET', 'POST'])
def start_task(task_name):
task_kwargs = {k: v for k, v in request.form.items() if k != 'csrf_token'}
async_task_id = worker_start_task(task_name, task_kwargs)
return redirect(url_for('test_bp.get_task_status', task_uuid=async_task_id))
@test_bp.route('/tasks/<task_uuid>/status')
def get_task_status(task_uuid):
ctx = get_task_ctx(task_uuid)
return render_template('test/task.html', **ctx)
@test_bp.route('/tasks/<task_uuid>')
def get_async_task_result(task_uuid):
ctx = get_task_ctx(task_uuid)
return jsonify(ctx)
PROCESSES = {}
def start_tail_process(filename):
p = subprocess.Popen(['tail', '-F', filename],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
return p
def tail(process):
for line in iter(process.stdout.readline, b''):
line = line.strip()
print("sending s_tail: {}".format(line))
emit("s_tail", {"stdout": line})
print("sent s_tail: {}".format((line)))
def kill_process(process):
if process:
# print('process: killing {}'.format(process))
process.kill()
print('process: killed {}'.format(process))
def remove_process(uuid):
if uuid in PROCESSES:
del PROCESSES[uuid]
@socketio.on('connect')
def handle_connect():
sid = str(request.sid)
print("socket.connect: session id: {}".format(sid))
emit('s_connect', {
"sid": sid,
})
@socketio.on('start_tail')
def handle_start_tail(json):
sid = str(request.sid)
task_uuid = json.get("task_uuid")
task_out_filename = os.path.join(TASKS_BUF_DIR, task_uuid)
process = start_tail_process(task_out_filename)
PROCESSES[sid] = ({
"sid": sid,
"process": process,
})
print("socket.start_tail: session id: {}".format(sid))
emit('s_start_tail', {
"sid": sid,
"task_uuid": task_uuid,
})
tail(process)
@socketio.on('disconnect')
def handle_disconnect():
sid = str(request.sid)
process = PROCESSES.get(sid, {}).get("process")
print("socket.disconnect: session id: {}".format(sid))
kill_process(process)
remove_process(process)
emit('s_dissconnect', {
"sid": sid,
})