process-based worker

This commit is contained in:
Daniel Tsvetkov 2020-03-23 11:56:29 +01:00
parent a4ba0d5d87
commit 0e062ccb7f
3 changed files with 104 additions and 58 deletions

View File

@ -1,18 +1,16 @@
import itertools
import json import json
import os import os
import subprocess import subprocess
import time import time
from uuid import uuid4 from uuid import uuid4
from config import TASKS_BUF_DIR
from flask import render_template, request, Response, redirect, url_for, jsonify from flask import render_template, request, Response, redirect, url_for, jsonify
from flask_table import Table, LinkCol, Col from flask_table import Table, LinkCol, Col
from oshipka.persistance import db from oshipka.persistance import db
from oshipka.webapp import test_bp, oshipka_bp from oshipka.webapp import test_bp, oshipka_bp
from config import TASKS_BUF_DIR
TASKS = {} TASKS = {}
@ -20,17 +18,8 @@ def register_task(task_name, task_func, *args, **kwargs):
TASKS[task_name] = task_func TASKS[task_name] = task_func
def stateless_task():
for i, c in enumerate(itertools.cycle('\|/-')):
yield "data: %s %d\n\n" % (c, i)
time.sleep(.1) # an artificial delay
register_task("stateless_task", stateless_task)
def stateful_task(*args, **kwargs): def stateful_task(*args, **kwargs):
n = 100 n = 5
for i in range(0, n + 1): for i in range(0, n + 1):
print(i) print(i)
time.sleep(1) time.sleep(1)
@ -79,9 +68,7 @@ def tail(filename):
) )
for line in iter(process.stdout.readline, b''): for line in iter(process.stdout.readline, b''):
line = line.strip() line = line.strip()
# print("sending s_tail: {}".format(line))
yield "data: {}\n\n".format(line) yield "data: {}\n\n".format(line)
# print("sent s_tail: {}".format((line)))
def worker_start_task(task_name, func_name, task_kwargs): def worker_start_task(task_name, func_name, task_kwargs):
@ -127,6 +114,5 @@ def stream(task_uuid):
task = Task.query.filter_by(uuid=task_uuid).first() task = Task.query.filter_by(uuid=task_uuid).first()
if not task: if not task:
return jsonify({"error": "no task with uuid {}".format(task_uuid)}), 404 return jsonify({"error": "no task with uuid {}".format(task_uuid)}), 404
# return Response(stateless_task(), content_type='text/event-stream')
return Response(tail(os.path.join(TASKS_BUF_DIR, task_uuid)), content_type='text/event-stream') return Response(tail(os.path.join(TASKS_BUF_DIR, task_uuid)), content_type='text/event-stream')
return jsonify({"error": "Request has to contain 'Accept: text/event-stream' header"}), 400 return jsonify({"error": "Request has to contain 'Accept: text/event-stream' header"}), 400

View File

@ -1,20 +1,25 @@
import json import json
import multiprocessing
import os import os
import queue import queue
import signal import signal
import time
import sys import sys
import threading import threading
from copy import deepcopy
from importlib import import_module from importlib import import_module
from time import sleep from time import sleep
from filelock import FileLock, Timeout
from oshipka.persistance import init_db, db from oshipka.persistance import init_db, db
from oshipka.webapp import app from oshipka.webapp import app
from oshipka.webapp.async_routes import Task from oshipka.webapp.async_routes import Task
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from config import TASKS_BUF_DIR from config import TASKS_BUF_DIR, TASKS_IN_DIR
q = queue.Queue()
POISON_PILL = "-.-.-.-"
def dyn_import(name): def dyn_import(name):
@ -42,51 +47,101 @@ class Unbuffered(object):
return getattr(self.stream, attr) return getattr(self.stream, attr)
class DirectoryWatcherHandler(FileSystemEventHandler):
def __init__(self, worker):
self.worker = worker
def on_any_event(self, event):
if event.is_directory:
return None
elif event.event_type == 'created':
# Take any action here when a file is first created.
task_uuid = os.path.basename(event.src_path)
if task_uuid.endswith(".lock"):
return
lock_fname = os.path.join(TASKS_IN_DIR, task_uuid)
lock = FileLock("{}.lock".format(lock_fname))
try:
with lock.acquire(timeout=1):
print("WATCHER: Acquired lock for {}.".format(task_uuid))
with open(lock_fname, 'r') as f:
if f.read() == "processing":
return
open(lock_fname, "w").write("processing")
self.worker.process(task_uuid)
open(lock_fname, "w").write("done")
os.remove(lock_fname)
print("WATCHER: Released lock for {}.".format(task_uuid))
except Timeout as e:
return
except Exception as e:
print("WATCHER: Exception: {}".format(e))
finally:
lock.release()
class DirectoryWatcher(object):
def __init__(self, worker):
self.worker = worker
self.observer = Observer()
self.event_handler = DirectoryWatcherHandler(self.worker)
def run(self):
self.observer.schedule(self.event_handler, self.worker.directory, recursive=True)
self.observer.start()
try:
while RUNNING:
time.sleep(3)
print("DirectoryWatcher: Stopped")
except:
print("DirectoryWatcher: Error")
self.observer.stop()
self.observer.join()
class Worker(object): class Worker(object):
def __init__(self): def __init__(self, tasks_dir):
self.stored_sys_stdout = None self.worker_name = None
self.stored_sys_stderr = None self.directory = tasks_dir
self.watcher = DirectoryWatcher(self)
def before_task(self, task_uuid): def before_task(self, task_uuid):
self.stored_sys_stdout = sys.stdout
self.stored_sys_stdout = sys.stdout
sys.stdout = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w')) sys.stdout = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w'))
sys.stderr = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w')) sys.stderr = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w'))
def after_task(self, task_uuid): def after_task(self, task_uuid):
sys.stdout = self.stored_sys_stdout sys.stdout = sys.__stdout__
sys.stdout = self.stored_sys_stderr sys.stderr = sys.__stderr__
def start(self): def start(self):
worker_id = multiprocessing.current_process()
self.worker_name = worker_id.name
print("Started worker {} (pid: {})".format(self.worker_name, worker_id.pid))
self.watcher.run()
def process(self, task_uuid):
app.app_context().push() app.app_context().push()
app.test_request_context().push() app.test_request_context().push()
worker_id = threading.get_ident() task = Task.query.filter_by(uuid=task_uuid).first()
print("Started worker {}".format(worker_id)) print("Worker {} received task: {}".format(self.worker_name, task.name))
while True:
print("Worker {} waiting for tasks...".format(worker_id))
task_uuid = q.get()
if task_uuid == POISON_PILL:
print("Killing worker {}".format(worker_id))
break
task = Task.query.filter_by(uuid=task_uuid).first()
print("Worker {} received task: {}".format(worker_id, task.name))
task.status = "STARTED" task.status = "STARTED"
db.session.add(task) db.session.add(task)
db.session.commit() db.session.commit()
print("Worker {} started task: {}".format(worker_id, task.name)) print("Worker {} started task: {}".format(self.worker_name, task.name))
task_func = dyn_import(task.func_name) task_func = dyn_import(task.func_name)
self.before_task(task.uuid) self.before_task(task.uuid)
task_func(*json.loads(task.args), **json.loads(task.kwargs)) task_func(*json.loads(task.args), **json.loads(task.kwargs))
self.after_task(task.uuid) self.after_task(task.uuid)
print("Worker {} finished task: {}".format(worker_id, task.name)) print("Worker {} finished task: {}".format(self.worker_name, task.name))
task.status = "DONE" task.status = "DONE"
db.session.add(task) db.session.add(task)
db.session.commit() db.session.commit()
RUNNING = True RUNNING = True
@ -106,11 +161,11 @@ def main(workers_cnt=4):
app.test_request_context().push() app.test_request_context().push()
init_db(app) init_db(app)
thread_pool = [] worker_pool = []
for w in range(workers_cnt): for w in range(workers_cnt):
worker = Worker() worker = Worker(TASKS_IN_DIR)
t = threading.Thread(target=worker.start, args=()) t = multiprocessing.Process(target=worker.start, args=())
thread_pool.append(t) worker_pool.append(t)
t.start() t.start()
print("Started main loop") print("Started main loop")
@ -118,17 +173,19 @@ def main(workers_cnt=4):
while RUNNING: while RUNNING:
not_started_tasks = Task.query.filter_by(status="NOT_STARTED").all() not_started_tasks = Task.query.filter_by(status="NOT_STARTED").all()
for task in not_started_tasks: for task in not_started_tasks:
lock_fname = os.path.join(TASKS_IN_DIR, task.uuid)
lock = FileLock("{}.lock".format(lock_fname))
with lock:
open(lock_fname, "a").write("queued")
task.status = "QUEUED" task.status = "QUEUED"
db.session.add(task) db.session.add(task)
db.session.commit() db.session.commit()
q.put(task.uuid)
sleep(1) sleep(1)
print("exiting main loop") print("exiting main loop")
for t in thread_pool: for t in worker_pool:
q.put(POISON_PILL)
t.join() t.join()

View File

@ -1,9 +1,12 @@
click==7.1.1 click==7.1.1
filelock==3.0.12
Flask==1.1.1 Flask==1.1.1
Flask-SQLAlchemy==2.4.1 Flask-SQLAlchemy==2.4.1
itsdangerous==1.1.0 itsdangerous==1.1.0
Jinja2==2.11.1 Jinja2==2.11.1
MarkupSafe==1.1.1 MarkupSafe==1.1.1
pathtools==0.1.2
pkg-resources==0.0.0 pkg-resources==0.0.0
SQLAlchemy==1.3.15 SQLAlchemy==1.3.15
watchdog==0.10.2
Werkzeug==1.0.0 Werkzeug==1.0.0