203 lines
5.7 KiB
Python
203 lines
5.7 KiB
Python
import json
|
|
import multiprocessing
|
|
import os
|
|
import queue
|
|
import signal
|
|
import time
|
|
|
|
import sys
|
|
import threading
|
|
from copy import deepcopy
|
|
from importlib import import_module
|
|
from time import sleep
|
|
|
|
from filelock import FileLock, Timeout
|
|
|
|
from oshipka.persistance import init_db, db
|
|
from oshipka.webapp import app
|
|
from oshipka.webapp.async_routes import Task
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
|
|
from config import TASKS_BUF_DIR, TASKS_IN_DIR
|
|
|
|
|
|
def dyn_import(name):
|
|
p, m = name.rsplit('.', 1)
|
|
|
|
mod = import_module(p)
|
|
func = getattr(mod, m)
|
|
|
|
return func
|
|
|
|
|
|
class Unbuffered(object):
|
|
def __init__(self, stream):
|
|
self.stream = stream
|
|
|
|
def write(self, data):
|
|
self.stream.write(data)
|
|
self.stream.flush()
|
|
|
|
def writelines(self, datas):
|
|
self.stream.writelines(datas)
|
|
self.stream.flush()
|
|
|
|
def __getattr__(self, attr):
|
|
return getattr(self.stream, attr)
|
|
|
|
|
|
class DirectoryWatcherHandler(FileSystemEventHandler):
|
|
def __init__(self, worker):
|
|
self.worker = worker
|
|
|
|
def on_any_event(self, event):
|
|
if event.is_directory:
|
|
return None
|
|
|
|
elif event.event_type == 'created':
|
|
# Take any action here when a file is first created.
|
|
task_uuid = os.path.basename(event.src_path)
|
|
if task_uuid.endswith(".lock"):
|
|
return
|
|
lock_fname = os.path.join(TASKS_IN_DIR, task_uuid)
|
|
lock = FileLock("{}.lock".format(lock_fname))
|
|
try:
|
|
with lock.acquire(timeout=1):
|
|
print("WATCHER: Acquired lock for {}.".format(task_uuid))
|
|
with open(lock_fname, 'r') as f:
|
|
if f.read() == "processing":
|
|
return
|
|
open(lock_fname, "w").write("processing")
|
|
self.worker.process(task_uuid)
|
|
open(lock_fname, "w").write("done")
|
|
os.remove(lock_fname)
|
|
print("WATCHER: Released lock for {}.".format(task_uuid))
|
|
except Timeout:
|
|
return
|
|
except OSError:
|
|
return
|
|
except Exception as e:
|
|
print("WATCHER: Exception: {}".format(e))
|
|
finally:
|
|
try:
|
|
os.remove(lock_fname)
|
|
except Exception:
|
|
...
|
|
lock.release()
|
|
|
|
|
|
class DirectoryWatcher(object):
|
|
def __init__(self, worker):
|
|
self.worker = worker
|
|
self.observer = Observer()
|
|
self.event_handler = DirectoryWatcherHandler(self.worker)
|
|
|
|
def run(self):
|
|
self.observer.schedule(self.event_handler, self.worker.directory, recursive=True)
|
|
self.observer.start()
|
|
try:
|
|
while RUNNING:
|
|
time.sleep(3)
|
|
print("DirectoryWatcher: Stopped")
|
|
except:
|
|
print("DirectoryWatcher: Error")
|
|
self.observer.stop()
|
|
self.observer.join()
|
|
|
|
|
|
class Worker(object):
|
|
def __init__(self, tasks_dir):
|
|
self.worker_name = None
|
|
self.directory = tasks_dir
|
|
self.watcher = DirectoryWatcher(self)
|
|
|
|
def before_task(self, task_uuid):
|
|
sys.stdout = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w'))
|
|
sys.stderr = Unbuffered(open(os.path.join(TASKS_BUF_DIR, task_uuid), 'w'))
|
|
|
|
def after_task(self, task_uuid):
|
|
sys.stdout = sys.__stdout__
|
|
sys.stderr = sys.__stderr__
|
|
|
|
def start(self):
|
|
worker_id = multiprocessing.current_process()
|
|
self.worker_name = worker_id.name
|
|
|
|
print("Started worker {} (pid: {})".format(self.worker_name, worker_id.pid))
|
|
self.watcher.run()
|
|
|
|
def process(self, task_uuid):
|
|
app.app_context().push()
|
|
app.test_request_context().push()
|
|
|
|
task = Task.query.filter_by(uuid=task_uuid).first()
|
|
print("Worker {} received task: {}".format(self.worker_name, task.name))
|
|
|
|
task.status = "STARTED"
|
|
db.session.add(task)
|
|
db.session.commit()
|
|
|
|
print("Worker {} started task: {}".format(self.worker_name, task.name))
|
|
task_func = dyn_import(task.func_name)
|
|
self.before_task(task.uuid)
|
|
try:
|
|
task_func(*json.loads(task.args), **json.loads(task.kwargs))
|
|
except Exception as e:
|
|
print("WORKER: Exception: {}".format(e))
|
|
self.after_task(task.uuid)
|
|
print("Worker {} finished task: {}".format(self.worker_name, task.name))
|
|
|
|
task.status = "DONE"
|
|
db.session.add(task)
|
|
db.session.commit()
|
|
|
|
|
|
RUNNING = True
|
|
|
|
|
|
def signal_handler(signal, frame):
|
|
global RUNNING
|
|
print("\nReceived kill signal, ending the workers gracefully...")
|
|
RUNNING = False
|
|
|
|
|
|
def main(workers_cnt=4):
|
|
global RUNNING
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
app.app_context().push()
|
|
app.test_request_context().push()
|
|
# init_db(app)
|
|
|
|
worker_pool = []
|
|
for w in range(workers_cnt):
|
|
worker = Worker(TASKS_IN_DIR)
|
|
t = multiprocessing.Process(target=worker.start, args=())
|
|
worker_pool.append(t)
|
|
t.start()
|
|
|
|
print("Started main loop")
|
|
|
|
while RUNNING:
|
|
not_started_tasks = Task.query.filter_by(status="NOT_STARTED").all()
|
|
for task in not_started_tasks:
|
|
lock_fname = os.path.join(TASKS_IN_DIR, task.uuid)
|
|
lock = FileLock("{}.lock".format(lock_fname))
|
|
with lock:
|
|
open(lock_fname, "a").write("queued")
|
|
|
|
task.status = "QUEUED"
|
|
db.session.add(task)
|
|
db.session.commit()
|
|
|
|
sleep(1)
|
|
|
|
print("exiting main loop")
|
|
for t in worker_pool:
|
|
t.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|