oshipka/oshipka/webapp/views.py

378 lines
14 KiB
Python

import os
import importlib
import json
from collections import defaultdict
from copy import copy
from functools import wraps
import inflect
from flask import flash, render_template, redirect, request, url_for, jsonify, abort
from flask_login import current_user
from flask_security import login_required, roles_required
from sqlalchemy_filters import apply_filters
from oshipka.persistance import db, filter_m_n, update_m_ns, SHARING_TYPE_TYPES_TYPE_PUBLIC, \
SHARING_TYPE_TYPES_TYPE_AUTHZ, SHARING_TYPE_TYPES_TYPE_AUTHN
from oshipka.util.strings import camel_case_to_snake_case
from config import MEDIA_DIR, SECURITY_ENABLED
from webapp.models import Permission
webapp_models = importlib.import_module("webapp.models")
MODEL_VIEWS = dict()
def has_permission(obj, action, instance=None, object_prefix="models", action_prefix="model"):
model_view = None
if object_prefix in ['models', 'columns']:
model_view = MODEL_VIEWS.get(obj, {})
if not model_view:
return False
if current_user.is_anonymous:
permission = Permission.query.filter(Permission.object == "{}.{}".format(object_prefix, obj),
Permission.action == "{}.{}".format(action_prefix, action),
Permission.subject == "public").first()
if permission and permission.is_allowed:
return True
return False
if instance is not None:
inherits = model_view.definitions.get('inherits', [])
if "Ownable" in inherits:
permission = Permission.query.filter(Permission.object == "{}.{}".format(object_prefix, obj),
Permission.object_id == instance.id,
Permission.action == "{}.{}".format(action_prefix, action),
Permission.subject == "owner").first()
if permission and permission.is_allowed:
return True
# LOGGED IN USER
permission = Permission.query.filter(Permission.object == "{}.{}".format(object_prefix, obj),
Permission.action == "{}.{}".format(action_prefix, action),
Permission.subject == "logged").first()
if permission and permission.is_allowed:
return True
# ROLE PERMISSIONS
if hasattr(current_user, "roles"):
roles_ids = [r.id for r in current_user.roles]
role_permissions = Permission.query.filter(Permission.object == "{}.{}".format(object_prefix, obj),
Permission.action == "{}.{}".format(action_prefix, action),
Permission.subject == "user",
Permission.subject_id.in_(roles_ids)).all()
for role_permission in role_permissions:
if role_permission.is_allowed:
return True
# USER PERMISSIONS
user_id = current_user.id
user_permission = Permission.query.filter(Permission.object == "{}.{}".format(object_prefix, obj),
Permission.action == "{}.{}".format(action_prefix, action),
Permission.subject == "user",
Permission.subject_id == user_id).first()
if user_permission:
return user_permission.is_allowed
return False
def default_get_args_func(view_context):
view_context.serialized_args = request.args
def default_get_form_func(vc):
vc.redirect_next = request.form.get('_next')
for k in request.form:
if k.startswith('_m_n_'):
vc.serialized_args[k] = request.form.getlist(k)
elif k.startswith('_is_'):
bool_values = request.form.getlist(k)
bool_value = True if len(bool_values) == 2 else False
vc.serialized_args[k[1:]] = bool_value
for k, f in request.files.items():
if f.filename != '':
filedir = os.path.join(MEDIA_DIR, vc.model_view.model_name_pl)
if not os.path.exists(filedir):
os.makedirs(filedir)
vc.serialized_args[k] = os.path.join(vc.model_view.model_name_pl, f.filename)
filepath = os.path.join(filedir, f.filename)
f.save(filepath)
vc.serialized_args.update(dict(filter(lambda k: not k[0].startswith("_"), dict(request.form).items())))
to_delete = []
for key, value in vc.serialized_args.items():
if key.endswith('_id'):
if value in ['']:
to_delete.append(key)
else:
vc.serialized_args[key] = int(value)
for key in to_delete:
del vc.serialized_args[key]
def default_jsonify_func(vc):
if type(vc.instances) is list:
return jsonify([instance.serialize() for instance in vc.instances])
return jsonify(vc.instances.serialize())
def default_redirect_func(vc):
return redirect(vc.redirect_next or request.referrer or url_for('home'))
def default_get_func(vc):
model = vc.model_view.model
uuid = vc.url_args.get('uuid')
if uuid and vc.url_args.get('uuid').isdigit():
vc.instance = model.query.filter_by(id=uuid).first()
vc.instances = model.query.filter_by(id=uuid).all()
else:
vc.instance = model.query.filter_by(uuid=uuid).first()
vc.instances = model.query.filter_by(uuid=uuid).all()
if not vc.instances:
flash("No {}:{}".format(vc.model_view.model_name, uuid))
def default_list_args_get_func(vc):
vc.serialized_args = request.args
def default_list_func(vc):
per_page = request.args.get('per_page')
order_by = request.args.get('order_by')
order_by_dir = request.args.get('order_by_dir')
# .order_by(getattr(vc.model_view.model, order_by).desc())
pagination = vc.model_view.model.query.paginate(per_page=per_page, error_out=False)
vc.template_ctx['pagination'] = pagination
vc.instances = pagination.items
def get_filters(serialized_args):
return json.loads(serialized_args.get("_filters", "[]"))
def default_search_func(vc):
q = vc.serialized_args.get('q')
if hasattr(vc.model_view.model, 'search_query'):
query = vc.model_view.model.search_query("{q}* or *{q}* or {q}*".format(q=q))
filters = get_filters(vc.serialized_args)
filtered_query = apply_filters(query, filters)
per_page = request.args.get('per_page')
pagination = filtered_query.paginate(per_page=per_page, error_out=False)
vc.template_ctx['pagination'] = pagination
vc.instances = pagination.items
def default_update_func(vc):
m_ns, to_delete = filter_m_n(vc.serialized_args)
for key in to_delete:
del vc.serialized_args[key]
vc.instance = vc.instances[0]
for k, v in vc.serialized_args.items():
if k.startswith("_file_"):
key = k.split('_file_')[1]
current_value = getattr(vc.instance, key)
if current_value and current_value != v:
os.remove(os.path.join(MEDIA_DIR, v))
setattr(vc.instance, key, v)
else:
setattr(vc.instance, k, v)
update_m_ns(vc.instance, m_ns)
db.session.add(vc.instance)
def default_create_func(vc):
instance = vc.instances or vc.model_view.model()
vc.instances = [instance]
default_update_func(vc)
def default_delete_func(vc):
instance = vc.instances[0]
for filecolumn in vc.model_view.model_file_columns:
filename = getattr(instance, filecolumn)
if filename:
try:
os.remove(os.path.join(MEDIA_DIR, filename))
except Exception as e:
flash("Error deleting file: {}".format(e), "error")
db.session.delete(instance)
def default_render_func(vc):
if len(vc.instances) == 1:
vc.template_ctx['instance'] = vc.instances[0]
vc.template_ctx['instances'] = vc.instances
vc.template_ctx['model_view'] = vc.model_view
vc.template_ctx['model_views'] = MODEL_VIEWS
return render_template(vc.template, **vc.template_ctx)
def default_commit_func(vc):
db.session.commit()
def default_none_func(vc):
pass
class ViewContext(object):
def __init__(self, args_get_func=None, args_process_func=None,
filter_func=None, redirect_func=None,
should_execute_func=None, execute_func=None, post_execute_func=None,
commit_func=None, post_commit_func=None,
jsonify_func=None, render_func=None, template_func=None, template_ctx_func=None,
should_redirect_no_instances_func=None,
should_redirect_at_end_func=None,
is_json=False, model_view=None, verb=None, **kwargs):
self.args_get_func = args_get_func or default_get_args_func
self.args_process_func = args_process_func or default_none_func
self.filter_func = filter_func or default_none_func
self.should_redirect_no_instances_func = should_redirect_no_instances_func or default_none_func
self.redirect_func = redirect_func or default_redirect_func
self.should_execute_func = should_execute_func or default_none_func
self.execute_func = execute_func or default_none_func
self.post_execute_func = post_execute_func or default_none_func
self.commit_func = commit_func or default_commit_func
self.post_commit_func = post_commit_func or default_none_func
self.jsonify_func = jsonify_func or default_jsonify_func
self.render_func = render_func or default_render_func
self.template_func = template_func or default_none_func
self.template_ctx_func = template_ctx_func or default_none_func
self.should_redirect_at_end_func = should_redirect_at_end_func or default_none_func
self.is_json = is_json
self.model_view = model_view
self.verb = verb
self.serialized_args = {}
self.url_args = {}
self.instances = []
self.should_execute = True
self.should_redirect_at_end = not is_json
self.template = None
self.template_ctx = {}
self.redirect_next = None
def return_json_or_template(view_context):
if view_context.is_json:
return view_context.jsonify_func(view_context)
view_context.template_func(view_context)
view_context.template_ctx_func(view_context)
return view_context.render_func(view_context)
def create_view(model_view, view_context_kwargs, is_login_required=False, the_roles_required=None, **kwargs):
view_context_kwargs['model_view'] = model_view
the_roles_required = [] if not the_roles_required else the_roles_required
def inner(**kwargs):
view_context = ViewContext(**view_context_kwargs)
model_name = view_context.model_view.model_name
if not has_permission(model_name, view_context.verb):
abort(403)
view_context.url_args = kwargs
view_context.args_get_func(view_context)
view_context.args_process_func(view_context)
view_context.filter_func(view_context)
if not view_context.instances:
if view_context.should_redirect_no_instances_func(view_context):
return view_context.redirect_func(view_context)
if request.method == "GET":
return return_json_or_template(view_context)
view_context.should_execute_func(view_context)
if not view_context.should_execute:
if view_context.is_json:
return view_context.jsonify_func(view_context)
else:
return view_context.redirect_func(view_context)
view_context.execute_func(view_context)
view_context.post_execute_func(view_context)
view_context.commit_func(view_context)
view_context.post_commit_func(view_context)
view_context.should_redirect_at_end_func(view_context)
if view_context.should_redirect_at_end:
return view_context.redirect_func(view_context)
return return_json_or_template(view_context)
if is_login_required:
if the_roles_required:
inner = roles_required(*the_roles_required)(inner)
return login_required(inner)
return inner
class ModelView(object):
def __init__(self, app, model, definitions=None):
self.app = app
self.model = model
self.definitions = definitions or {}
p = inflect.engine()
if hasattr(model, "__name__"):
_model_name = getattr(model, "__name__")
else:
_model_name = model.name
self.model_name = camel_case_to_snake_case(_model_name)
self.model_name_pl = p.plural(self.model_name)
self.model_file_columns = model._file_columns
MODEL_VIEWS[self.model_name] = self
def _register_rule(self, url_args, **kwargs):
url = url_args.pop('rule')
api_url = '/api{}'.format(url)
endpoint = url_args.pop('endpoint')
api_endpoint = 'api_{}'.format(endpoint)
view_func = url_args.pop('view_func')
view_context_kwargs = copy(kwargs['view_context'].__dict__)
kwargs['view_context_kwargs'] = view_context_kwargs
self.app.add_url_rule(rule=url, endpoint=endpoint,
view_func=view_func(self, **kwargs), **url_args)
view_context_kwargs = copy(kwargs['view_context'].__dict__)
view_context_kwargs['is_json'] = True
kwargs['view_context_kwargs'] = view_context_kwargs
self.app.add_url_rule(rule=api_url, endpoint=api_endpoint,
view_func=view_func(self, **kwargs), **url_args)
def register_verb(self, verb, methods=None, per_item=False, **kwargs):
if not methods:
methods = ["GET"]
rule = '/{}'.format(self.model_name_pl)
if per_item:
rule += '/<uuid>'
rule += '/{}'.format(verb)
url_args = dict(
rule=rule,
methods=methods,
endpoint='{}_{}'.format(verb, self.model_name),
view_func=create_view,
)
self._register_rule(url_args, **kwargs)
def catch_flash(f):
@wraps(f)
def inner(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
flash(str(e), "error")
serialized_form = {k: v for k, v in request.form.items()}
if '_next' in serialized_form:
_next = serialized_form.pop('_next')
elif request.referrer and request.referrer != request.path:
_next = request.referrer
else:
_next = url_for('home')
return redirect(_next)
return inner