fix media perms

This commit is contained in:
Daniel Tsvetkov 2021-05-09 21:34:11 +02:00
parent 3a539d698d
commit 79d02aee72
5 changed files with 81 additions and 54 deletions

View File

@ -248,42 +248,7 @@ def register_filters(app):
def bool_filter(v):
return bool(v)
def check_instance_perm(model_view, verb, instance):
model_acl = model_view.model_acl
# Anonymous user -> check public ACL
if current_user.is_anonymous:
instance_acl = model_acl.query.filter_by(instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first()
else:
# Logged in user -> find (user, instance) pair
instance_acl = model_acl.query.filter_by(user=current_user, instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ).first()
if not instance_acl:
# If not (user, instance) pair -> check authN ACL
instance_acl = model_acl.query.filter_by(user=current_user, instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN).first()
column = verb.replace('.', '__')
return getattr(instance_acl, column)
def has_permission(model, verb, instance=None):
model_view = MODEL_VIEWS.get(model, {})
if '.' in verb:
return check_instance_perm(model_view, verb, instance)
acl = model_view.model.model_acls.get(verb)
# Anonymous user -> do we require AuthN?
if current_user.is_anonymous:
return not acl.get('authn')
# Not Anonymous user -> Check roles
roles = acl.get('authz')
# No roles required -> has permission
if not roles:
return True
# One role is enough to grant permission
for role in roles:
if current_user.has_role(role):
return True
return False
from oshipka.webapp.views import has_permission
app.jinja_env.globals.update(has_permission=has_permission)
@ -379,7 +344,10 @@ def populate_static(app):
model = getattr(models, model_name)
model_acl = getattr(models, model_name + 'Acl')
with open(os.path.join(STATIC_DATA_DIR, "{}.csv".format(model_name))) as f:
user = User.query.first()
if issubclass(model, Ownable):
user = User.query.first()
else:
user = None
reader = csv.DictReader(f)
for row in reader:
row_updates = dict()
@ -427,6 +395,6 @@ def create_model(model, model_acl, user, serialized_args):
for key, ids in m_ns.items():
m_ns[key] = ids.split(',')
update_m_ns(instance, m_ns)
if model_acl and user:
if model_acl:
create_acls(model_acl, instance, user)
return instance

View File

@ -1,7 +1,7 @@
import urllib
import requests
from flask import send_from_directory, redirect, request, url_for, session, jsonify
from flask import send_from_directory, redirect, request, url_for, session, jsonify, abort
from flask_security import login_user
from oshipka.util.strings import random_string_generator
@ -9,15 +9,27 @@ from oshipka.webapp import oshipka_bp, app
from config import MEDIA_DIR, APP_BASE_URL, SECURITY_ENABLED, SSO_BASE_URL, SSO_CLIENT_ID
from sensitive import SSO_CLIENT_SECRET
from oshipka.webapp.views import MODEL_VIEWS, has_permission
# TODO: VULNZ - EVERYONE HAS ACCESS TO THIS
@oshipka_bp.route('/media/<path:filepath>')
def get_media(filepath):
@oshipka_bp.route('/media/<model_name>/<int:instance_id>/<column>/<path:filepath>')
def get_media(model_name, instance_id, column, filepath):
model_view = MODEL_VIEWS.get(model_name)
if not model_view:
abort(404)
model = model_view.model
instance = model.query.filter_by(id=instance_id).first()
if not instance:
abort(404)
verb = "{}.read".format(column)
if not has_permission(model_name, verb, instance):
abort(401)
return send_from_directory(MEDIA_DIR, filepath)
if SECURITY_ENABLED:
from oshipka.persistance import User, db
app.config['SSO_BASE_URL'] = SSO_BASE_URL
SSO_AUTH_URL = '/oidc/auth'

View File

@ -21,6 +21,52 @@ webapp_models = importlib.import_module("webapp.models")
MODEL_VIEWS = dict()
def check_instance_perm(model_view, verb, instance):
model_acl = model_view.model_acl
# Anonymous user -> check public ACL
if current_user.is_anonymous:
instance_acl = model_acl.query.filter_by(instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first()
else:
# Logged in user -> find (user, instance) pair
instance_acl = model_acl.query.filter_by(user=current_user, instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ).first()
if not instance_acl:
# If not (user, instance) pair -> check authN ACL
instance_acl = model_acl.query.filter_by(user=current_user, instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN).first()
if not instance_acl:
# finally, try public
instance_acl = model_acl.query.filter_by(instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first()
if not instance_acl:
return False
column = verb.replace('.', '__')
return getattr(instance_acl, column)
def has_permission(model, verb, instance=None):
model_view = MODEL_VIEWS.get(model, {})
if not model_view:
return False
if '.' in verb:
return check_instance_perm(model_view, verb, instance)
acl = model_view.model.model_acls.get(verb)
# Anonymous user -> do we require AuthN?
if current_user.is_anonymous:
return not acl.get('authn')
# Not Anonymous user -> Check roles
roles = acl.get('authz')
# No roles required -> has permission
if not roles:
return True
# One role is enough to grant permission
for role in roles:
if current_user.has_role(role):
return True
return False
def default_get_args_func(view_context):
view_context.serialized_args = request.args
@ -129,16 +175,18 @@ def default_create_func(vc):
instance = vc.instances or vc.model_view.model()
vc.instances = [instance]
default_update_func(vc)
create_acls(vc.model_view.model_acl, instance, current_user)
user = current_user if not current_user.is_anonymous else None
create_acls(vc.model_view.model_acl, instance, user)
def create_acls(model_acl, instance, user):
instance_public_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC)
instance_authn_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN)
instance_authz_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ)
db.session.add(instance_public_acl)
db.session.add(instance_authn_acl)
db.session.add(instance_authz_acl)
if user:
instance_authn_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN)
instance_authz_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ)
db.session.add(instance_authn_acl)
db.session.add(instance_authz_acl)
def default_delete_func(vc):

View File

@ -10,14 +10,14 @@
[%- endif %]
[%- endif %]
[%- if column.type in ['picture', 'image', 'img'] %]
<img src="{{ url_for('oshipka_bp.get_media', filepath=instance.[[ column.name ]]) }}" id="display-[[ name|camel_to_snake ]]-[[ column.name ]]" />
<img src="{{ url_for('oshipka_bp.get_media', model_name='[[ name|camel_to_snake ]]', instance_id=instance.id, column='[[ column.name ]]', filepath=instance.[[ column.name ]]) }}" id="display-[[ name|camel_to_snake ]]-[[ column.name ]]" />
[%- elif column.type in ['video'] %]
<video src="{{ url_for('oshipka_bp.get_media', filepath=instance.[[ column.name ]]) }}" controls class="video-inline" id="display-[[ name|camel_to_snake ]]-[[ column.name ]]">
<source src="{{ url_for('oshipka_bp.get_media', filepath=instance.[[ column.name ]]) }}" type="video/mp4">
<source src="{{ url_for('oshipka_bp.get_media', filepath=instance.[[ column.name ]]) }}" type="video/webm">
<video src="{{ url_for('oshipka_bp.get_media', model_name='[[ name|camel_to_snake ]]', instance_id=instance.id, column='[[ column.name ]]', filepath=instance.[[ column.name ]]) }}" controls class="video-inline" id="display-[[ name|camel_to_snake ]]-[[ column.name ]]">
<source src="{{ url_for('oshipka_bp.get_media', model_name='[[ name|camel_to_snake ]]', instance_id=instance.id, column='[[ column.name ]]', filepath=instance.[[ column.name ]]) }}" type="video/mp4">
<source src="{{ url_for('oshipka_bp.get_media', model_name='[[ name|camel_to_snake ]]', instance_id=instance.id, column='[[ column.name ]]', filepath=instance.[[ column.name ]]) }}" type="video/webm">
</video>
[%- elif column.type in ['audio'] %]
<audio src="{{ url_for('oshipka_bp.get_media', filepath=instance.[[ column.name ]]) }}" controls id="display-[[ name|camel_to_snake ]]-[[ column.name ]]"></audio>
<audio src="{{ url_for('oshipka_bp.get_media', model_name='[[ name|camel_to_snake ]]', instance_id=instance.id, column='[[ column.name ]]', filepath=instance.[[ column.name ]]) }}" controls id="display-[[ name|camel_to_snake ]]-[[ column.name ]]"></audio>
[%- elif column.type in ['relationship'] %]
[%- if column.multiple %]
<ul>

View File

@ -1,5 +1,4 @@
from oshipka.persistance import db, ModelController, index_service, LiberalBoolean[% for inherit in interits %], [[ inherit ]][% endfor %]
from oshipka.persistance import db, ModelController, index_service, LiberalBoolean, Ownable
[%- if imports %]
[%- for import in imports %]
[[ import ]]