fix after runobeat

This commit is contained in:
Daniel Tsvetkov 2020-03-24 11:55:54 +01:00
parent a486cd7f66
commit 4a8d8da076
3 changed files with 23 additions and 10 deletions

View File

@ -1,5 +1,5 @@
.idea
venv
*.pyc
data/db.sqlite
data/
__pycache__

View File

@ -14,8 +14,11 @@ from oshipka.webapp import test_bp, oshipka_bp
TASKS = {}
def register_task(task_name, task_func, *args, **kwargs):
TASKS[task_name] = task_func
def register_task(task_name, task_func, package, *args, **kwargs):
TASKS[task_name] = {
"func": task_func,
"package": package,
}
def stateful_task(*args, **kwargs):
@ -25,7 +28,7 @@ def stateful_task(*args, **kwargs):
time.sleep(1)
register_task("stateful_task", stateful_task)
register_task("stateful_task", stateful_task, "oshipka.webapp.async_routes.{}")
class Task(db.Model):
@ -71,7 +74,9 @@ def tail(filename):
yield "data: {}\n\n".format(line)
def worker_start_task(task_name, func_name, task_kwargs):
def worker_start_task(task_name, task_kwargs):
reg_task = TASKS.get(task_name)
func_name = reg_task.get('package').format(reg_task.get('func').__name__)
uuid = str(uuid4())
task = Task(name=task_name,
uuid=uuid,
@ -86,9 +91,8 @@ def worker_start_task(task_name, func_name, task_kwargs):
@test_bp.route('/tasks/<task_name>/start', methods=['GET', 'POST'])
def start_task(task_name):
task_kwargs = {k: v for k, v in request.form.items() if k != 'csrf_token'}
func_name = "oshipka.webapp.async_routes.{}".format(TASKS.get(task_name).__name__)
async_task_id = worker_start_task(task_name, func_name, task_kwargs)
return redirect(url_for('test_bp.get_task_status', task_uuid=async_task_id))
task_uuid = worker_start_task(task_name, task_kwargs)
return redirect(url_for('test_bp.get_task_status', task_uuid=task_uuid))
def get_task_ctx(task_uuid):

View File

@ -73,11 +73,17 @@ class DirectoryWatcherHandler(FileSystemEventHandler):
open(lock_fname, "w").write("done")
os.remove(lock_fname)
print("WATCHER: Released lock for {}.".format(task_uuid))
except Timeout as e:
except Timeout:
return
except OSError:
return
except Exception as e:
print("WATCHER: Exception: {}".format(e))
finally:
try:
os.remove(lock_fname)
except Exception:
...
lock.release()
@ -135,7 +141,10 @@ class Worker(object):
print("Worker {} started task: {}".format(self.worker_name, task.name))
task_func = dyn_import(task.func_name)
self.before_task(task.uuid)
try:
task_func(*json.loads(task.args), **json.loads(task.kwargs))
except Exception as e:
print("WORKER: Exception: {}".format(e))
self.after_task(task.uuid)
print("Worker {} finished task: {}".format(self.worker_name, task.name))