oshipka/oshipka/webapp/views.py

237 lines
7.9 KiB
Python

from functools import wraps
import inflect
from flask import flash, render_template, redirect, request, url_for, jsonify
from oshipka.persistance import db
from oshipka.util.strings import camel_case_to_snake_case
MODEL_VIEWS = dict()
def default_get_args_func(view_context):
view_context.serialized_args = request.args
def default_get_form_func(vc):
vc.redirect_next = request.form.get('_next')
vc.serialized_args = dict(filter(lambda k: not k[0].startswith("_"), dict(request.form).items()))
to_delete = []
for key, value in vc.serialized_args.items():
if key.endswith('_id'):
if value in ['']:
to_delete.append(key)
else:
vc.serialized_args[key] = int(value)
for key in to_delete:
del vc.serialized_args[key]
def default_jsonify_func(vc):
if type(vc.instances) is list:
return jsonify([instance.serialize() for instance in vc.instances])
return jsonify(vc.instances.serialize())
def default_redirect_func(vc):
return redirect(vc.redirect_next or request.referrer or url_for('home'))
def default_get_func(vc):
model = vc.model_view.model
uuid = vc.url_args.get('uuid')
if uuid and vc.url_args.get('uuid').isdigit():
vc.instances = model.query.filter_by(id=uuid).all()
else:
vc.instances = model.query.filter_by(uuid=uuid).all()
if not vc.instances:
flash("No {}:{}".format(vc.model_view.model_name, uuid))
def default_list_args_get_func(vc):
vc.serialized_args = request.args
def default_list_func(vc):
vc.instances = vc.model_view.model.query.all()
def default_search_func(vc):
q = vc.serialized_args.get('q')
if hasattr(vc.model_view.model, 'search_query'):
vc.instances = vc.model_view.model.search_query("{q}".format(q=q)).all()
def default_create_func(vc):
instance = vc.model_view.model(**vc.serialized_args)
db.session.add(instance)
vc.instances = [instance]
def default_update_func(vc):
instance = vc.instances[0]
for k, v in vc.serialized_args.items():
setattr(instance, k, v)
db.session.add(instance)
def default_delete_func(vc):
instance = vc.instances[0]
db.session.delete(instance)
def default_render_func(vc):
if len(vc.instances) == 1:
vc.template_ctx['instance'] = vc.instances[0]
vc.template_ctx['instances'] = vc.instances
vc.template_ctx['model_view'] = vc.model_view
vc.template_ctx['model_views'] = MODEL_VIEWS
return render_template(vc.template, **vc.template_ctx)
def default_commit_func(vc):
db.session.commit()
def default_none_func(vc):
pass
class ViewContext(object):
def __init__(self, args_get_func=None, args_process_func=None,
filter_func=None, redirect_func=None,
should_execute_func=None, execute_func=None, post_execute_func=None,
commit_func=None, post_commit_func=None,
jsonify_func=None, render_func=None, template_func=None, template_ctx_func=None,
should_redirect_no_instances_func=None,
should_redirect_at_end_func=None,
json=False, **kwargs):
self.args_get_func = args_get_func or default_get_args_func
self.args_process_func = args_process_func or default_none_func
self.filter_func = filter_func or default_none_func
self.should_redirect_no_instances_func = should_redirect_no_instances_func or default_none_func
self.redirect_func = redirect_func or default_redirect_func
self.should_execute_func = should_execute_func or default_none_func
self.execute_func = execute_func or default_none_func
self.post_execute_func = post_execute_func or default_none_func
self.commit_func = commit_func or default_commit_func
self.post_commit_func = post_commit_func or default_none_func
self.jsonify_func = jsonify_func or default_jsonify_func
self.render_func = render_func or default_render_func
self.template_func = template_func or default_none_func
self.template_ctx_func = template_ctx_func or default_none_func
self.should_redirect_at_end_func = should_redirect_at_end_func or default_none_func
self.json = json
self.serialized_args = {}
self.url_args = {}
self.instances = []
self.should_execute = True
self.should_redirect_at_end = True
self.template = None
self.template_ctx = {}
self.model_view = None
self.redirect_next = None
def create_view(model_view, view_context, is_json=False, **kwargs):
view_context.model_view = model_view
def return_json_or_template():
if is_json:
return view_context.jsonify_func(view_context)
view_context.template_func(view_context)
view_context.template_ctx_func(view_context)
return view_context.render_func(view_context)
def inner(**kwargs):
view_context.url_args = kwargs
view_context.args_get_func(view_context)
view_context.args_process_func(view_context)
view_context.filter_func(view_context)
if not view_context.instances:
if view_context.should_redirect_no_instances_func(view_context):
return view_context.redirect_func(view_context)
if request.method == "GET":
return return_json_or_template()
view_context.should_execute_func(view_context)
if not view_context.should_execute:
return view_context.redirect_func(view_context)
view_context.execute_func(view_context)
view_context.post_execute_func(view_context)
view_context.commit_func(view_context)
view_context.post_commit_func(view_context)
view_context.should_redirect_at_end_func(view_context)
if view_context.should_redirect_at_end:
return view_context.redirect_func(view_context)
return return_json_or_template()
return inner
class ModelView(object):
def __init__(self, app, model):
self.app = app
self.model = model
p = inflect.engine()
if hasattr(model, "__name__"):
_model_name = getattr(model, "__name__")
else:
_model_name = model.name
self.model_name = camel_case_to_snake_case(_model_name)
self.model_name_pl = p.plural(self.model_name)
MODEL_VIEWS[self.model_name] = self
def _register_rule(self, url_args, **kwargs):
url = url_args.pop('rule')
api_url = '/api{}'.format(url)
endpoint = url_args.pop('endpoint')
api_endpoint = 'api_{}'.format(endpoint)
view_func = url_args.pop('view_func')
self.app.add_url_rule(rule=url, endpoint=endpoint,
view_func=view_func(self, **kwargs), **url_args)
kwargs['is_json'] = True
self.app.add_url_rule(rule=api_url, endpoint=api_endpoint,
view_func=view_func(self, **kwargs), **url_args)
def register_verb(self, verb, methods=None, per_item=False, **kwargs):
if not methods:
methods = ["GET"]
rule = '/{}'.format(self.model_name_pl)
if per_item:
rule += '/<uuid>'
rule += '/{}'.format(verb)
url_args = dict(
rule=rule,
methods=methods,
endpoint='{}_{}'.format(verb, self.model_name),
view_func=create_view,
)
self._register_rule(url_args, **kwargs)
def catch_flash(f):
@wraps(f)
def inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
flash(str(e), "error")
serialized_form = serialize_form()
if '_next' in serialized_form:
_next = serialized_form.pop('_next')
elif request.referrer and request.referrer != request.path:
_next = request.referrer
else:
_next = url_for('home')
return redirect(_next)
return inner