diff --git a/oshipka/webapp/async_routes.py b/oshipka/webapp/async_routes.py index 4deb3ef..6a7b97c 100644 --- a/oshipka/webapp/async_routes.py +++ b/oshipka/webapp/async_routes.py @@ -1,18 +1,16 @@ -import itertools import json import os import subprocess import time from uuid import uuid4 +from config import TASKS_BUF_DIR from flask import render_template, request, Response, redirect, url_for, jsonify from flask_table import Table, LinkCol, Col from oshipka.persistance import db from oshipka.webapp import test_bp, oshipka_bp -from config import TASKS_BUF_DIR - TASKS = {} @@ -20,17 +18,8 @@ def register_task(task_name, task_func, *args, **kwargs): TASKS[task_name] = task_func -def stateless_task(): - for i, c in enumerate(itertools.cycle('\|/-')): - yield "data: %s %d\n\n" % (c, i) - time.sleep(.1) # an artificial delay - - -register_task("stateless_task", stateless_task) - - def stateful_task(*args, **kwargs): - n = 100 + n = 5 for i in range(0, n + 1): print(i) time.sleep(1) @@ -79,9 +68,7 @@ def tail(filename): ) for line in iter(process.stdout.readline, b''): line = line.strip() - # print("sending s_tail: {}".format(line)) yield "data: {}\n\n".format(line) - # print("sent s_tail: {}".format((line))) def worker_start_task(task_name, func_name, task_kwargs): @@ -127,6 +114,5 @@ def stream(task_uuid): task = Task.query.filter_by(uuid=task_uuid).first() if not task: return jsonify({"error": "no task with uuid {}".format(task_uuid)}), 404 - # return Response(stateless_task(), content_type='text/event-stream') return Response(tail(os.path.join(TASKS_BUF_DIR, task_uuid)), content_type='text/event-stream') return jsonify({"error": "Request has to contain 'Accept: text/event-stream' header"}), 400 diff --git a/oshipka/worker.py b/oshipka/worker.py index aadebc8..0bc3541 100644 --- a/oshipka/worker.py +++ b/oshipka/worker.py @@ -1,20 +1,25 @@ import json +import multiprocessing import os import queue import signal +import time + import sys import threading +from copy import deepcopy from importlib import import_module from time import sleep +from filelock import FileLock, Timeout + from oshipka.persistance import init_db, db from oshipka.webapp import app from oshipka.webapp.async_routes import Task +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler -from config import TASKS_BUF_DIR - -q = queue.Queue() -POISON_PILL = "-.-.-.-" +from config import TASKS_BUF_DIR, TASKS_IN_DIR def dyn_import(name): @@ -42,51 +47,101 @@ class Unbuffered(object): return getattr(self.stream, attr) +class DirectoryWatcherHandler(FileSystemEventHandler): + def __init__(self, worker): + self.worker = worker + + def on_any_event(self, event): + if event.is_directory: + return None + + elif event.event_type == 'created': + # Take any action here when a file is first created. + task_uuid = os.path.basename(event.src_path) + if task_uuid.endswith(".lock"): + return + lock_fname = os.path.join(TASKS_IN_DIR, task_uuid) + lock = FileLock("{}.lock".format(lock_fname)) + try: + with lock.acquire(timeout=1): + print("WATCHER: Acquired lock for {}.".format(task_uuid)) + with open(lock_fname, 'r') as f: + if f.read() == "processing": + return + open(lock_fname, "w").write("processing") + self.worker.process(task_uuid) + open(lock_fname, "w").write("done") + os.remove(lock_fname) + print("WATCHER: Released lock for {}.".format(task_uuid)) + except Timeout as e: + return + except Exception as e: + print("WATCHER: Exception: {}".format(e)) + finally: + lock.release() + + +class DirectoryWatcher(object): + def __init__(self, worker): + self.worker = worker + self.observer = Observer() + self.event_handler = DirectoryWatcherHandler(self.worker) + + def run(self): + self.observer.schedule(self.event_handler, self.worker.directory, recursive=True) + self.observer.start() + try: + while RUNNING: + time.sleep(3) + print("DirectoryWatcher: Stopped") + except: + print("DirectoryWatcher: Error") + self.observer.stop() + self.observer.join() + + class Worker(object): - def __init__(self): - self.stored_sys_stdout = None - self.stored_sys_stderr = None + def __init__(self, tasks_dir): + self.worker_name = None + self.directory = tasks_dir + self.watcher = DirectoryWatcher(self) def before_task(self, task_uuid): - self.stored_sys_stdout = sys.stdout - self.stored_sys_stdout = sys.stdout - sys.stdout = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w')) sys.stderr = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w')) def after_task(self, task_uuid): - sys.stdout = self.stored_sys_stdout - sys.stdout = self.stored_sys_stderr + sys.stdout = sys.__stdout__ + sys.stderr = sys.__stderr__ def start(self): + worker_id = multiprocessing.current_process() + self.worker_name = worker_id.name + + print("Started worker {} (pid: {})".format(self.worker_name, worker_id.pid)) + self.watcher.run() + + def process(self, task_uuid): app.app_context().push() app.test_request_context().push() - worker_id = threading.get_ident() - print("Started worker {}".format(worker_id)) - while True: - print("Worker {} waiting for tasks...".format(worker_id)) - task_uuid = q.get() - if task_uuid == POISON_PILL: - print("Killing worker {}".format(worker_id)) - break - task = Task.query.filter_by(uuid=task_uuid).first() - print("Worker {} received task: {}".format(worker_id, task.name)) + task = Task.query.filter_by(uuid=task_uuid).first() + print("Worker {} received task: {}".format(self.worker_name, task.name)) - task.status = "STARTED" - db.session.add(task) - db.session.commit() + task.status = "STARTED" + db.session.add(task) + db.session.commit() - print("Worker {} started task: {}".format(worker_id, task.name)) - task_func = dyn_import(task.func_name) - self.before_task(task.uuid) - task_func(*json.loads(task.args), **json.loads(task.kwargs)) - self.after_task(task.uuid) - print("Worker {} finished task: {}".format(worker_id, task.name)) + print("Worker {} started task: {}".format(self.worker_name, task.name)) + task_func = dyn_import(task.func_name) + self.before_task(task.uuid) + task_func(*json.loads(task.args), **json.loads(task.kwargs)) + self.after_task(task.uuid) + print("Worker {} finished task: {}".format(self.worker_name, task.name)) - task.status = "DONE" - db.session.add(task) - db.session.commit() + task.status = "DONE" + db.session.add(task) + db.session.commit() RUNNING = True @@ -106,11 +161,11 @@ def main(workers_cnt=4): app.test_request_context().push() init_db(app) - thread_pool = [] + worker_pool = [] for w in range(workers_cnt): - worker = Worker() - t = threading.Thread(target=worker.start, args=()) - thread_pool.append(t) + worker = Worker(TASKS_IN_DIR) + t = multiprocessing.Process(target=worker.start, args=()) + worker_pool.append(t) t.start() print("Started main loop") @@ -118,17 +173,19 @@ def main(workers_cnt=4): while RUNNING: not_started_tasks = Task.query.filter_by(status="NOT_STARTED").all() for task in not_started_tasks: + lock_fname = os.path.join(TASKS_IN_DIR, task.uuid) + lock = FileLock("{}.lock".format(lock_fname)) + with lock: + open(lock_fname, "a").write("queued") + task.status = "QUEUED" db.session.add(task) db.session.commit() - q.put(task.uuid) - sleep(1) print("exiting main loop") - for t in thread_pool: - q.put(POISON_PILL) + for t in worker_pool: t.join() diff --git a/requirements.txt b/requirements.txt index bca6661..8a09746 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,12 @@ click==7.1.1 +filelock==3.0.12 Flask==1.1.1 Flask-SQLAlchemy==2.4.1 itsdangerous==1.1.0 Jinja2==2.11.1 MarkupSafe==1.1.1 +pathtools==0.1.2 pkg-resources==0.0.0 SQLAlchemy==1.3.15 +watchdog==0.10.2 Werkzeug==1.0.0