test permissions after sso

This commit is contained in:
Daniel Tsvetkov 2021-05-09 17:27:22 +02:00
parent 55742c2104
commit 3a539d698d
2 changed files with 15 additions and 6 deletions

View File

@ -252,7 +252,7 @@ def register_filters(app):
model_acl = model_view.model_acl model_acl = model_view.model_acl
# Anonymous user -> check public ACL # Anonymous user -> check public ACL
if current_user.is_anonymous: if current_user.is_anonymous:
instance_acl = model_acl.query.filter_by(user=current_user, instance=instance, instance_acl = model_acl.query.filter_by(instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first() acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first()
else: else:
# Logged in user -> find (user, instance) pair # Logged in user -> find (user, instance) pair
@ -374,9 +374,12 @@ def populate_static(app):
for model_name in ordered_model_names: for model_name in ordered_model_names:
if SECURITY_ENABLED and model_name in ['User', 'Role']: if SECURITY_ENABLED and model_name in ['User', 'Role']:
model = eval(model_name) model = eval(model_name)
model_acl = None
else: else:
model = getattr(models, model_name) model = getattr(models, model_name)
model_acl = getattr(models, model_name + 'Acl')
with open(os.path.join(STATIC_DATA_DIR, "{}.csv".format(model_name))) as f: with open(os.path.join(STATIC_DATA_DIR, "{}.csv".format(model_name))) as f:
user = User.query.first()
reader = csv.DictReader(f) reader = csv.DictReader(f)
for row in reader: for row in reader:
row_updates = dict() row_updates = dict()
@ -387,7 +390,7 @@ def populate_static(app):
row_updates[key] = sensitive_value row_updates[key] = sensitive_value
if row_updates: if row_updates:
row.update(row_updates) row.update(row_updates)
instance = create_model(model, row) instance = create_model(model, model_acl, user, row)
db.session.add(instance) db.session.add(instance)
db.session.commit() db.session.commit()
print("Finished populating") print("Finished populating")
@ -413,7 +416,8 @@ def update_m_ns(instance, m_ns):
setattr(instance, key, children) setattr(instance, key, children)
def create_model(model, serialized_args): def create_model(model, model_acl, user, serialized_args):
from oshipka.webapp.views import create_acls
m_ns, to_delete = filter_m_n(serialized_args) m_ns, to_delete = filter_m_n(serialized_args)
for key in to_delete: for key in to_delete:
del serialized_args[key] del serialized_args[key]
@ -423,4 +427,6 @@ def create_model(model, serialized_args):
for key, ids in m_ns.items(): for key, ids in m_ns.items():
m_ns[key] = ids.split(',') m_ns[key] = ids.split(',')
update_m_ns(instance, m_ns) update_m_ns(instance, m_ns)
if model_acl and user:
create_acls(model_acl, instance, user)
return instance return instance

View File

@ -129,10 +129,13 @@ def default_create_func(vc):
instance = vc.instances or vc.model_view.model() instance = vc.instances or vc.model_view.model()
vc.instances = [instance] vc.instances = [instance]
default_update_func(vc) default_update_func(vc)
create_acls(vc.model_view.model_acl, instance, current_user)
instance_public_acl = vc.model_view.model_acl(user=current_user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC)
instance_authn_acl = vc.model_view.model_acl(user=current_user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN) def create_acls(model_acl, instance, user):
instance_authz_acl = vc.model_view.model_acl(user=current_user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ) instance_public_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC)
instance_authn_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN)
instance_authz_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ)
db.session.add(instance_public_acl) db.session.add(instance_public_acl)
db.session.add(instance_authn_acl) db.session.add(instance_authn_acl)
db.session.add(instance_authz_acl) db.session.add(instance_authz_acl)