oshipka/oshipka/worker.py

203 lines
5.7 KiB
Python

import json
import multiprocessing
import os
import queue
import signal
import time
import sys
import threading
from copy import deepcopy
from importlib import import_module
from time import sleep
from filelock import FileLock, Timeout
from oshipka.persistance import init_db, db
from oshipka.webapp import app
from oshipka.webapp.async_routes import Task
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from config import TASKS_BUF_DIR, TASKS_IN_DIR
def dyn_import(name):
p, m = name.rsplit('.', 1)
mod = import_module(p)
func = getattr(mod, m)
return func
class Unbuffered(object):
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def writelines(self, datas):
self.stream.writelines(datas)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
class DirectoryWatcherHandler(FileSystemEventHandler):
def __init__(self, worker):
self.worker = worker
def on_any_event(self, event):
if event.is_directory:
return None
elif event.event_type == 'created':
# Take any action here when a file is first created.
task_uuid = os.path.basename(event.src_path)
if task_uuid.endswith(".lock"):
return
lock_fname = os.path.join(TASKS_IN_DIR, task_uuid)
lock = FileLock("{}.lock".format(lock_fname))
try:
with lock.acquire(timeout=1):
print("WATCHER: Acquired lock for {}.".format(task_uuid))
with open(lock_fname, 'r') as f:
if f.read() == "processing":
return
open(lock_fname, "w").write("processing")
self.worker.process(task_uuid)
open(lock_fname, "w").write("done")
os.remove(lock_fname)
print("WATCHER: Released lock for {}.".format(task_uuid))
except Timeout:
return
except OSError:
return
except Exception as e:
print("WATCHER: Exception: {}".format(e))
finally:
try:
os.remove(lock_fname)
except Exception:
...
lock.release()
class DirectoryWatcher(object):
def __init__(self, worker):
self.worker = worker
self.observer = Observer()
self.event_handler = DirectoryWatcherHandler(self.worker)
def run(self):
self.observer.schedule(self.event_handler, self.worker.directory, recursive=True)
self.observer.start()
try:
while RUNNING:
time.sleep(3)
print("DirectoryWatcher: Stopped")
except:
print("DirectoryWatcher: Error")
self.observer.stop()
self.observer.join()
class Worker(object):
def __init__(self, tasks_dir):
self.worker_name = None
self.directory = tasks_dir
self.watcher = DirectoryWatcher(self)
def before_task(self, task_uuid):
sys.stdout = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w'))
sys.stderr = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w'))
def after_task(self, task_uuid):
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
def start(self):
worker_id = multiprocessing.current_process()
self.worker_name = worker_id.name
print("Started worker {} (pid: {})".format(self.worker_name, worker_id.pid))
self.watcher.run()
def process(self, task_uuid):
app.app_context().push()
app.test_request_context().push()
task = Task.query.filter_by(uuid=task_uuid).first()
print("Worker {} received task: {}".format(self.worker_name, task.name))
task.status = "STARTED"
db.session.add(task)
db.session.commit()
print("Worker {} started task: {}".format(self.worker_name, task.name))
task_func = dyn_import(task.func_name)
self.before_task(task.uuid)
try:
task_func(*json.loads(task.args), **json.loads(task.kwargs))
except Exception as e:
print("WORKER: Exception: {}".format(e))
self.after_task(task.uuid)
print("Worker {} finished task: {}".format(self.worker_name, task.name))
task.status = "DONE"
db.session.add(task)
db.session.commit()
RUNNING = True
def signal_handler(signal, frame):
global RUNNING
print("\nReceived kill signal, ending the workers gracefully...")
RUNNING = False
def main(workers_cnt=4):
global RUNNING
signal.signal(signal.SIGINT, signal_handler)
app.app_context().push()
app.test_request_context().push()
# init_db(app)
worker_pool = []
for w in range(workers_cnt):
worker = Worker(TASKS_IN_DIR)
t = multiprocessing.Process(target=worker.start, args=())
worker_pool.append(t)
t.start()
print("Started main loop")
while RUNNING:
not_started_tasks = Task.query.filter_by(status="NOT_STARTED").all()
for task in not_started_tasks:
lock_fname = os.path.join(TASKS_IN_DIR, task.uuid)
lock = FileLock("{}.lock".format(lock_fname))
with lock:
open(lock_fname, "a").write("queued")
task.status = "QUEUED"
db.session.add(task)
db.session.commit()
sleep(1)
print("exiting main loop")
for t in worker_pool:
t.join()
if __name__ == "__main__":
main()