From 4a8d8da07622475378de034f6cf922694dfbef62 Mon Sep 17 00:00:00 2001 From: Daniel Tsvetkov Date: Tue, 24 Mar 2020 11:55:54 +0100 Subject: [PATCH] fix after runobeat --- bootstrap/.gitignore | 2 +- oshipka/webapp/async_routes.py | 18 +++++++++++------- oshipka/worker.py | 13 +++++++++++-- 3 files changed, 23 insertions(+), 10 deletions(-) diff --git a/bootstrap/.gitignore b/bootstrap/.gitignore index 6c40623..4b47bbc 100644 --- a/bootstrap/.gitignore +++ b/bootstrap/.gitignore @@ -1,5 +1,5 @@ .idea venv *.pyc -data/db.sqlite +data/ __pycache__ \ No newline at end of file diff --git a/oshipka/webapp/async_routes.py b/oshipka/webapp/async_routes.py index 6a7b97c..5f45c56 100644 --- a/oshipka/webapp/async_routes.py +++ b/oshipka/webapp/async_routes.py @@ -14,8 +14,11 @@ from oshipka.webapp import test_bp, oshipka_bp TASKS = {} -def register_task(task_name, task_func, *args, **kwargs): - TASKS[task_name] = task_func +def register_task(task_name, task_func, package, *args, **kwargs): + TASKS[task_name] = { + "func": task_func, + "package": package, + } def stateful_task(*args, **kwargs): @@ -25,7 +28,7 @@ def stateful_task(*args, **kwargs): time.sleep(1) -register_task("stateful_task", stateful_task) +register_task("stateful_task", stateful_task, "oshipka.webapp.async_routes.{}") class Task(db.Model): @@ -71,7 +74,9 @@ def tail(filename): yield "data: {}\n\n".format(line) -def worker_start_task(task_name, func_name, task_kwargs): +def worker_start_task(task_name, task_kwargs): + reg_task = TASKS.get(task_name) + func_name = reg_task.get('package').format(reg_task.get('func').__name__) uuid = str(uuid4()) task = Task(name=task_name, uuid=uuid, @@ -86,9 +91,8 @@ def worker_start_task(task_name, func_name, task_kwargs): @test_bp.route('/tasks//start', methods=['GET', 'POST']) def start_task(task_name): task_kwargs = {k: v for k, v in request.form.items() if k != 'csrf_token'} - func_name = "oshipka.webapp.async_routes.{}".format(TASKS.get(task_name).__name__) - async_task_id = worker_start_task(task_name, func_name, task_kwargs) - return redirect(url_for('test_bp.get_task_status', task_uuid=async_task_id)) + task_uuid = worker_start_task(task_name, task_kwargs) + return redirect(url_for('test_bp.get_task_status', task_uuid=task_uuid)) def get_task_ctx(task_uuid): diff --git a/oshipka/worker.py b/oshipka/worker.py index 0bc3541..dfb0b7f 100644 --- a/oshipka/worker.py +++ b/oshipka/worker.py @@ -73,11 +73,17 @@ class DirectoryWatcherHandler(FileSystemEventHandler): open(lock_fname, "w").write("done") os.remove(lock_fname) print("WATCHER: Released lock for {}.".format(task_uuid)) - except Timeout as e: + except Timeout: + return + except OSError: return except Exception as e: print("WATCHER: Exception: {}".format(e)) finally: + try: + os.remove(lock_fname) + except Exception: + ... lock.release() @@ -135,7 +141,10 @@ class Worker(object): print("Worker {} started task: {}".format(self.worker_name, task.name)) task_func = dyn_import(task.func_name) self.before_task(task.uuid) - task_func(*json.loads(task.args), **json.loads(task.kwargs)) + try: + task_func(*json.loads(task.args), **json.loads(task.kwargs)) + except Exception as e: + print("WORKER: Exception: {}".format(e)) self.after_task(task.uuid) print("Worker {} finished task: {}".format(self.worker_name, task.name))