own worker and follower

This commit is contained in:
Daniel Tsvetkov 2020-03-20 12:44:42 +01:00
parent 75da26dbb9
commit a4ba0d5d87
14 changed files with 281 additions and 6908 deletions

View File

@ -16,7 +16,7 @@ Usage $0 [ bootstrap | worker | web ]
" "
worker () { worker () {
celery worker --app=tasks.worker.worker_app --concurrency=1 --loglevel=INFO python worker.py
} }
web () { web () {

View File

@ -11,8 +11,9 @@ def init_db(app):
app.config["SQLALCHEMY_DATABASE_URI"] = SQLALCHEMY_DATABASE_URI app.config["SQLALCHEMY_DATABASE_URI"] = SQLALCHEMY_DATABASE_URI
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
from oshipka.webapp import test_bp from oshipka.webapp import test_bp, oshipka_bp
app.register_blueprint(test_bp) app.register_blueprint(test_bp)
app.register_blueprint(oshipka_bp)
db.init_app(app) db.init_app(app)
for dir in MAKEDIRS: for dir in MAKEDIRS:

View File

@ -15,5 +15,9 @@ test_bp = Blueprint('test_bp', __name__,
static_folder='static', static_folder='static',
) )
import oshipka.webapp.tasks_routes oshipka_bp = Blueprint('oshipka_bp', __name__,
import oshipka.webapp.websockets_routes template_folder='templates',
static_folder='static',
)
import oshipka.webapp.async_routes

View File

@ -0,0 +1,132 @@
import itertools
import json
import os
import subprocess
import time
from uuid import uuid4
from flask import render_template, request, Response, redirect, url_for, jsonify
from flask_table import Table, LinkCol, Col
from oshipka.persistance import db
from oshipka.webapp import test_bp, oshipka_bp
from config import TASKS_BUF_DIR
TASKS = {}
def register_task(task_name, task_func, *args, **kwargs):
TASKS[task_name] = task_func
def stateless_task():
for i, c in enumerate(itertools.cycle('\|/-')):
yield "data: %s %d\n\n" % (c, i)
time.sleep(.1) # an artificial delay
register_task("stateless_task", stateless_task)
def stateful_task(*args, **kwargs):
n = 100
for i in range(0, n + 1):
print(i)
time.sleep(1)
register_task("stateful_task", stateful_task)
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.Unicode)
uuid = db.Column(db.Unicode)
status = db.Column(db.Unicode, default="NOT_STARTED")
func_name = db.Column(db.Unicode)
args = db.Column(db.Unicode, default="[]")
kwargs = db.Column(db.Unicode, default="{}")
def serialize(self):
return dict(
name=self.name, uuid=self.uuid, kwargs=json.loads(self.kwargs),
status=self.status,
)
class TasksTable(Table):
uuid = LinkCol('Task', "test_bp.get_task_status", url_kwargs=dict(task_uuid='uuid'))
name = Col('name')
@test_bp.route("/tasks", methods=["GET"])
def list_tasks():
tasks = Task.query.all()
tasks_table = TasksTable(tasks)
return render_template("test/tasks.html",
runnable_tasks=TASKS.keys(),
tasks_table=tasks_table)
def tail(filename):
process = subprocess.Popen(['tail', '-F', filename],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
for line in iter(process.stdout.readline, b''):
line = line.strip()
# print("sending s_tail: {}".format(line))
yield "data: {}\n\n".format(line)
# print("sent s_tail: {}".format((line)))
def worker_start_task(task_name, func_name, task_kwargs):
uuid = str(uuid4())
task = Task(name=task_name,
uuid=uuid,
kwargs=json.dumps(task_kwargs),
func_name=func_name
)
db.session.add(task)
db.session.commit()
return uuid
@test_bp.route('/tasks/<task_name>/start', methods=['GET', 'POST'])
def start_task(task_name):
task_kwargs = {k: v for k, v in request.form.items() if k != 'csrf_token'}
func_name = "oshipka.webapp.async_routes.{}".format(TASKS.get(task_name).__name__)
async_task_id = worker_start_task(task_name, func_name, task_kwargs)
return redirect(url_for('test_bp.get_task_status', task_uuid=async_task_id))
def get_task_ctx(task_uuid):
ctx = {}
task = Task.query.filter_by(uuid=task_uuid).first()
ctx['task'] = task.serialize()
out_filename = os.path.join(TASKS_BUF_DIR, task_uuid)
if os.path.exists(out_filename):
with open(out_filename) as f:
ctx['stdout'] = f.read()
return ctx
@test_bp.route('/task_status/<task_uuid>')
def get_task_status(task_uuid):
ctx = get_task_ctx(task_uuid)
return render_template("test/task_status.html", **ctx)
@oshipka_bp.route('/stream/<task_uuid>')
def stream(task_uuid):
if request.headers.get('accept') == 'text/event-stream':
task = Task.query.filter_by(uuid=task_uuid).first()
if not task:
return jsonify({"error": "no task with uuid {}".format(task_uuid)}), 404
# return Response(stateless_task(), content_type='text/event-stream')
return Response(tail(os.path.join(TASKS_BUF_DIR, task_uuid)), content_type='text/event-stream')
return jsonify({"error": "Request has to contain 'Accept: text/event-stream' header"}), 400

File diff suppressed because it is too large Load Diff

View File

@ -1,156 +0,0 @@
import json
import os
import subprocess
import threading
from flask import redirect, request, url_for, jsonify, render_template
from flask_socketio import emit
from flask_table import Table, LinkCol, Col
from oshipka.persistance import db
from oshipka.webapp import test_bp, socketio
from oshipka.worker import worker_app
from config import TASKS_BUF_DIR
class Task(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.Unicode)
uuid = db.Column(db.Unicode)
kwargs = db.Column(db.Unicode)
def serialize(self):
return dict(
name=self.name, uuid=self.uuid, kwargs=json.loads(self.kwargs),
)
class TasksTable(Table):
uuid = LinkCol('Task', "test_bp.get_task_status", url_kwargs=dict(task_uuid='uuid'))
name = Col('name')
def worker_start_task(task_name, task_kwargs):
async_task = worker_app.send_task(task_name, [], task_kwargs)
task = Task(name=task_name,
uuid=async_task.id,
kwargs=json.dumps(task_kwargs),
)
db.session.add(task)
db.session.commit()
return async_task.id
def get_task_ctx(task_uuid):
ctx = {}
async_task = worker_app.AsyncResult(id=task_uuid)
ctx['async_task'] = {
'result': async_task.result,
'status': async_task.status,
}
task = Task.query.filter_by(uuid=async_task.id).first()
ctx['task'] = task.serialize()
out_filename = os.path.join(TASKS_BUF_DIR, task_uuid)
if os.path.exists(out_filename):
with open(out_filename) as f:
ctx['async_task']['stdout'] = f.read()
return ctx
@test_bp.route("/tasks", methods=["GET"])
def list_tasks():
tasks = Task.query.all()
tasks_table = TasksTable(tasks)
return render_template("test/tasks.html", tasks_table=tasks_table)
@test_bp.route('/tasks/<task_name>/start', methods=['GET', 'POST'])
def start_task(task_name):
task_kwargs = {k: v for k, v in request.form.items() if k != 'csrf_token'}
async_task_id = worker_start_task(task_name, task_kwargs)
return redirect(url_for('test_bp.get_task_status', task_uuid=async_task_id))
@test_bp.route('/tasks/<task_uuid>/status')
def get_task_status(task_uuid):
ctx = get_task_ctx(task_uuid)
return render_template('test/task.html', **ctx)
@test_bp.route('/tasks/<task_uuid>')
def get_async_task_result(task_uuid):
ctx = get_task_ctx(task_uuid)
return jsonify(ctx)
PROCESSES = {}
def start_tail_process(filename):
p = subprocess.Popen(['tail', '-F', filename],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
)
return p
def tail(process):
for line in iter(process.stdout.readline, b''):
line = line.strip()
print("sending s_tail: {}".format(line))
emit("s_tail", {"stdout": line})
print("sent s_tail: {}".format((line)))
def kill_process(process):
if process:
# print('process: killing {}'.format(process))
process.kill()
print('process: killed {}'.format(process))
def remove_process(uuid):
if uuid in PROCESSES:
del PROCESSES[uuid]
@socketio.on('connect')
def handle_connect():
sid = str(request.sid)
print("socket.connect: session id: {}".format(sid))
emit('s_connect', {
"sid": sid,
})
@socketio.on('start_tail')
def handle_start_tail(json):
sid = str(request.sid)
task_uuid = json.get("task_uuid")
task_out_filename = os.path.join(TASKS_BUF_DIR, task_uuid)
process = start_tail_process(task_out_filename)
PROCESSES[sid] = ({
"sid": sid,
"process": process,
})
print("socket.start_tail: session id: {}".format(sid))
emit('s_start_tail', {
"sid": sid,
"task_uuid": task_uuid,
})
tail(process)
@socketio.on('disconnect')
def handle_disconnect():
sid = str(request.sid)
process = PROCESSES.get(sid, {}).get("process")
print("socket.disconnect: session id: {}".format(sid))
kill_process(process)
remove_process(process)
emit('s_dissconnect', {
"sid": sid,
})

View File

@ -1,3 +1,2 @@
Test Test
<a href="{{ url_for('test_bp.websockets') }}">websockets</a> |
<a href="{{ url_for('test_bp.list_tasks') }}">tasks</a> | <a href="{{ url_for('test_bp.list_tasks') }}">tasks</a> |

View File

@ -1,45 +0,0 @@
{% extends "test/layout.html" %}
{% block aside %}
{% endblock %}
{% block content %}
<p><strong>Status:</strong>{{ async_task.status }}</p>
<p><strong>Result:</strong>{{ async_task.result }}</p>
<p><strong>Task ID:</strong><span id="task_uuid">{{ task.uuid }}</span></p>
<p><strong>Task name:</strong>{{ task.name }}</p>
<p><strong>Task kwargs:</strong>{{ task.kwargs }}</p>
<p><strong>Task out:</strong><span id="task_out"></span></p>
{% endblock %}
{% block script %}
<script src="{{ url_for('test_bp.static', filename='js/socket.io.js') }}"></script>
<script>
var task_uuid = $('#task_uuid').text();
function connectServer() {
socket = io.connect();
socket.on('s_connect', function (e) {
console.log("s_connect");
});
socket.emit("start_tail", {task_uuid: task_uuid});
socket.on('s_start_tail', function (e) {
console.log('server start tail', e);
});
socket.on('s_tail', function (o) {
console.log('server tail', o);
$('#task_out').text(o.stdout);
});
socket.on('s_disconnect', function (e) {
console.log('server disconnected', e);
});
}
connectServer()
</script>
{% endblock %}

View File

@ -0,0 +1,25 @@
{% extends "test/layout.html" %}
{% block aside %}
{% endblock %}
{% block content %}
<p><strong>Task ID:</strong><span id="task_uuid">{{ task.uuid }}</span></p>
<p><strong>Task name:</strong>{{ task.name }}</p>
<p><strong>Task kwargs:</strong>{{ task.kwargs }}</p>
<p><strong>Task out:</strong><span id="task_out">{{ output }}</span></p>
{% endblock %}
{% block script %}
<script>
var TASK_NAME = "{{ task.uuid }}";
if (!!window.EventSource) {
var source = new EventSource('/stream/' + TASK_NAME);
source.onmessage = function (e) {
$("#task_out").text(e.data);
}
}
</script>
{% endblock %}

View File

@ -5,7 +5,9 @@
{% endblock %} {% endblock %}
{% block content %} {% block content %}
<a href="{{ url_for('test_bp.start_task', task_name='long_running_task') }}">Start long_running_task</a> {% for runnable_task in runnable_tasks %}
<a href="{{ url_for('test_bp.start_task', task_name=runnable_task) }}">Start {{runnable_task}}</a>
{% endfor %}
<hr> <hr>
{{ tasks_table }} {{ tasks_table }}
{% endblock %} {% endblock %}

View File

@ -1,19 +0,0 @@
{% extends "test/layout.html" %}
{% block content %}
{% endblock %}
{% block script %}
<script src="{{ url_for('test_bp.static', filename='js/socket.io.js') }}"></script>
<script>
var socket = io();
socket.on('connect', function () {
console.log("SOCKETIO: sending SYN.");
socket.emit('SYN', {data: 'SYN'});
console.log("SOCKETIO: sent SYN.");
});
socket.on('SYN-ACK', function (data) {
console.log("SOCKETIO: rcvd SYN-ACK: " + data);
});
</script>
{% endblock %}

View File

@ -1,17 +0,0 @@
from flask import render_template
from flask_socketio import emit
from oshipka.webapp import test_bp, socketio
@test_bp.route('/websockets')
def websockets():
return render_template("test/websockets.html")
@socketio.on('SYN')
def handle_my_custom_namespace_event(json):
print('SOCKETIO: rcvd SYN: {}'.format(json))
print('SOCKETIO: sending SYN-ACK')
emit("SYN-ACK", {"data": 'SYN-ACK'})
print('SOCKETIO: sent SYN-ACK')

View File

@ -1,27 +1,29 @@
import json
import os import os
import queue
import signal
import sys import sys
import threading
from importlib import import_module
from time import sleep
from celery import Celery from oshipka.persistance import init_db, db
from celery.signals import task_prerun from oshipka.webapp import app
from oshipka.webapp.async_routes import Task
from config import TASKS_IN_DIR, TASKS_PROC_DIR, TASKS_BUF_DIR from config import TASKS_BUF_DIR
from oshipka.persistance import db q = queue.Queue()
POISON_PILL = "-.-.-.-"
worker_app = Celery(__name__)
worker_app.conf.update({ def dyn_import(name):
'broker_url': 'filesystem://', p, m = name.rsplit('.', 1)
'result_backend': "file://{}".format(TASKS_PROC_DIR),
'broker_transport_options': { mod = import_module(p)
'data_folder_in': TASKS_IN_DIR, func = getattr(mod, m)
'data_folder_out': TASKS_IN_DIR,
'data_folder_processed': TASKS_PROC_DIR, return func
},
'imports': ('tasks',),
'result_persistent': True,
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})
class Unbuffered(object): class Unbuffered(object):
@ -40,11 +42,95 @@ class Unbuffered(object):
return getattr(self.stream, attr) return getattr(self.stream, attr)
@task_prerun.connect class Worker(object):
def before_task(task_id=None, task=None, *args, **kwargs): def __init__(self):
from oshipka import app self.stored_sys_stdout = None
sys.stdout = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_id), 'w')) self.stored_sys_stderr = None
sys.stderr = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_id), 'w'))
db.init_app(app) def before_task(self, task_uuid):
self.stored_sys_stdout = sys.stdout
self.stored_sys_stdout = sys.stdout
sys.stdout = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w'))
sys.stderr = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w'))
def after_task(self, task_uuid):
sys.stdout = self.stored_sys_stdout
sys.stdout = self.stored_sys_stderr
def start(self):
app.app_context().push() app.app_context().push()
app.test_request_context().push() app.test_request_context().push()
worker_id = threading.get_ident()
print("Started worker {}".format(worker_id))
while True:
print("Worker {} waiting for tasks...".format(worker_id))
task_uuid = q.get()
if task_uuid == POISON_PILL:
print("Killing worker {}".format(worker_id))
break
task = Task.query.filter_by(uuid=task_uuid).first()
print("Worker {} received task: {}".format(worker_id, task.name))
task.status = "STARTED"
db.session.add(task)
db.session.commit()
print("Worker {} started task: {}".format(worker_id, task.name))
task_func = dyn_import(task.func_name)
self.before_task(task.uuid)
task_func(*json.loads(task.args), **json.loads(task.kwargs))
self.after_task(task.uuid)
print("Worker {} finished task: {}".format(worker_id, task.name))
task.status = "DONE"
db.session.add(task)
db.session.commit()
RUNNING = True
def signal_handler(signal, frame):
global RUNNING
print("\nReceived kill signal, ending the workers gracefully...")
RUNNING = False
def main(workers_cnt=4):
global RUNNING
signal.signal(signal.SIGINT, signal_handler)
app.app_context().push()
app.test_request_context().push()
init_db(app)
thread_pool = []
for w in range(workers_cnt):
worker = Worker()
t = threading.Thread(target=worker.start, args=())
thread_pool.append(t)
t.start()
print("Started main loop")
while RUNNING:
not_started_tasks = Task.query.filter_by(status="NOT_STARTED").all()
for task in not_started_tasks:
task.status = "QUEUED"
db.session.add(task)
db.session.commit()
q.put(task.uuid)
sleep(1)
print("exiting main loop")
for t in thread_pool:
q.put(POISON_PILL)
t.join()
if __name__ == "__main__":
main()

View File

@ -1,28 +1,9 @@
amqp==2.5.2
Babel==2.8.0
billiard==3.6.3.0
celery==4.4.2
click==7.1.1 click==7.1.1
dnspython==1.16.0
eventlet==0.25.1
Flask==1.1.1 Flask==1.1.1
Flask-Babel==1.0.0 Flask-SQLAlchemy==2.4.1
Flask-Celery==2.4.3
Flask-Script==2.0.6
Flask-SocketIO==4.2.1
Flask-Table==0.5.0
greenlet==0.4.15
importlib-metadata==1.5.0
itsdangerous==1.1.0 itsdangerous==1.1.0
Jinja2==2.11.1 Jinja2==2.11.1
kombu==4.6.8
MarkupSafe==1.1.1 MarkupSafe==1.1.1
monotonic==1.5
pkg-resources==0.0.0 pkg-resources==0.0.0
python-engineio==3.12.1 SQLAlchemy==1.3.15
python-socketio==4.5.0
pytz==2019.3
six==1.14.0
vine==1.3.0
Werkzeug==1.0.0 Werkzeug==1.0.0
zipp==3.1.0