From 79d02aee72efb10a4492843bc11bb249abd4af57 Mon Sep 17 00:00:00 2001 From: Daniel Tsvetkov Date: Sun, 9 May 2021 21:34:11 +0200 Subject: [PATCH] fix media perms --- oshipka/persistance/__init__.py | 44 ++++-------------------- oshipka/webapp/default_routes.py | 20 ++++++++--- oshipka/webapp/views.py | 58 +++++++++++++++++++++++++++++--- vm_gen/templates/html/_get.html | 10 +++--- vm_gen/templates/model_py | 3 +- 5 files changed, 81 insertions(+), 54 deletions(-) diff --git a/oshipka/persistance/__init__.py b/oshipka/persistance/__init__.py index 334e6ff..4b7c0b9 100644 --- a/oshipka/persistance/__init__.py +++ b/oshipka/persistance/__init__.py @@ -248,42 +248,7 @@ def register_filters(app): def bool_filter(v): return bool(v) - def check_instance_perm(model_view, verb, instance): - model_acl = model_view.model_acl - # Anonymous user -> check public ACL - if current_user.is_anonymous: - instance_acl = model_acl.query.filter_by(instance=instance, - acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first() - else: - # Logged in user -> find (user, instance) pair - instance_acl = model_acl.query.filter_by(user=current_user, instance=instance, - acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ).first() - if not instance_acl: - # If not (user, instance) pair -> check authN ACL - instance_acl = model_acl.query.filter_by(user=current_user, instance=instance, - acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN).first() - column = verb.replace('.', '__') - return getattr(instance_acl, column) - - def has_permission(model, verb, instance=None): - model_view = MODEL_VIEWS.get(model, {}) - if '.' in verb: - return check_instance_perm(model_view, verb, instance) - acl = model_view.model.model_acls.get(verb) - # Anonymous user -> do we require AuthN? - if current_user.is_anonymous: - return not acl.get('authn') - # Not Anonymous user -> Check roles - roles = acl.get('authz') - # No roles required -> has permission - if not roles: - return True - # One role is enough to grant permission - for role in roles: - if current_user.has_role(role): - return True - return False - + from oshipka.webapp.views import has_permission app.jinja_env.globals.update(has_permission=has_permission) @@ -379,7 +344,10 @@ def populate_static(app): model = getattr(models, model_name) model_acl = getattr(models, model_name + 'Acl') with open(os.path.join(STATIC_DATA_DIR, "{}.csv".format(model_name))) as f: - user = User.query.first() + if issubclass(model, Ownable): + user = User.query.first() + else: + user = None reader = csv.DictReader(f) for row in reader: row_updates = dict() @@ -427,6 +395,6 @@ def create_model(model, model_acl, user, serialized_args): for key, ids in m_ns.items(): m_ns[key] = ids.split(',') update_m_ns(instance, m_ns) - if model_acl and user: + if model_acl: create_acls(model_acl, instance, user) return instance diff --git a/oshipka/webapp/default_routes.py b/oshipka/webapp/default_routes.py index f313440..4ea1daf 100644 --- a/oshipka/webapp/default_routes.py +++ b/oshipka/webapp/default_routes.py @@ -1,7 +1,7 @@ import urllib import requests -from flask import send_from_directory, redirect, request, url_for, session, jsonify +from flask import send_from_directory, redirect, request, url_for, session, jsonify, abort from flask_security import login_user from oshipka.util.strings import random_string_generator @@ -9,15 +9,27 @@ from oshipka.webapp import oshipka_bp, app from config import MEDIA_DIR, APP_BASE_URL, SECURITY_ENABLED, SSO_BASE_URL, SSO_CLIENT_ID from sensitive import SSO_CLIENT_SECRET +from oshipka.webapp.views import MODEL_VIEWS, has_permission -# TODO: VULNZ - EVERYONE HAS ACCESS TO THIS -@oshipka_bp.route('/media/') -def get_media(filepath): + +@oshipka_bp.route('/media////') +def get_media(model_name, instance_id, column, filepath): + model_view = MODEL_VIEWS.get(model_name) + if not model_view: + abort(404) + model = model_view.model + instance = model.query.filter_by(id=instance_id).first() + if not instance: + abort(404) + verb = "{}.read".format(column) + if not has_permission(model_name, verb, instance): + abort(401) return send_from_directory(MEDIA_DIR, filepath) if SECURITY_ENABLED: from oshipka.persistance import User, db + app.config['SSO_BASE_URL'] = SSO_BASE_URL SSO_AUTH_URL = '/oidc/auth' diff --git a/oshipka/webapp/views.py b/oshipka/webapp/views.py index 097753e..d9dfca3 100644 --- a/oshipka/webapp/views.py +++ b/oshipka/webapp/views.py @@ -21,6 +21,52 @@ webapp_models = importlib.import_module("webapp.models") MODEL_VIEWS = dict() +def check_instance_perm(model_view, verb, instance): + model_acl = model_view.model_acl + # Anonymous user -> check public ACL + if current_user.is_anonymous: + instance_acl = model_acl.query.filter_by(instance=instance, + acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first() + else: + # Logged in user -> find (user, instance) pair + instance_acl = model_acl.query.filter_by(user=current_user, instance=instance, + acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ).first() + if not instance_acl: + # If not (user, instance) pair -> check authN ACL + instance_acl = model_acl.query.filter_by(user=current_user, instance=instance, + acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN).first() + if not instance_acl: + # finally, try public + instance_acl = model_acl.query.filter_by(instance=instance, + acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first() + if not instance_acl: + return False + column = verb.replace('.', '__') + return getattr(instance_acl, column) + + +def has_permission(model, verb, instance=None): + model_view = MODEL_VIEWS.get(model, {}) + if not model_view: + return False + if '.' in verb: + return check_instance_perm(model_view, verb, instance) + acl = model_view.model.model_acls.get(verb) + # Anonymous user -> do we require AuthN? + if current_user.is_anonymous: + return not acl.get('authn') + # Not Anonymous user -> Check roles + roles = acl.get('authz') + # No roles required -> has permission + if not roles: + return True + # One role is enough to grant permission + for role in roles: + if current_user.has_role(role): + return True + return False + + def default_get_args_func(view_context): view_context.serialized_args = request.args @@ -129,16 +175,18 @@ def default_create_func(vc): instance = vc.instances or vc.model_view.model() vc.instances = [instance] default_update_func(vc) - create_acls(vc.model_view.model_acl, instance, current_user) + user = current_user if not current_user.is_anonymous else None + create_acls(vc.model_view.model_acl, instance, user) def create_acls(model_acl, instance, user): instance_public_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC) - instance_authn_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN) - instance_authz_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ) db.session.add(instance_public_acl) - db.session.add(instance_authn_acl) - db.session.add(instance_authz_acl) + if user: + instance_authn_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN) + instance_authz_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ) + db.session.add(instance_authn_acl) + db.session.add(instance_authz_acl) def default_delete_func(vc): diff --git a/vm_gen/templates/html/_get.html b/vm_gen/templates/html/_get.html index 8971962..abf41dd 100644 --- a/vm_gen/templates/html/_get.html +++ b/vm_gen/templates/html/_get.html @@ -10,14 +10,14 @@ [%- endif %] [%- endif %] [%- if column.type in ['picture', 'image', 'img'] %] - + [%- elif column.type in ['video'] %] -