initial permissions table

This commit is contained in:
Daniel Tsvetkov 2021-05-11 00:14:25 +02:00
parent 284cea56de
commit 878a7c5947
19 changed files with 298 additions and 206 deletions

View File

View File

@ -0,0 +1,18 @@
name: Permission
access:
- verb: all
login_required: true
columns:
- name: subject
- name: subject_id
type: int
- name: action
- name: object
- name: subject_id
type: int
- name: is_allowed
type: boolean
display:
primary: subject
secondary: action
tertiary: object

View File

@ -328,6 +328,59 @@ def init_db(app):
SENSITIVE_PREFIX = "__SENSITIVE__."
"""
role,1,permission.get,models.song,,1
role,1,permission.add_user,models.song,,1
role,1,permission.add_role,models.song,,1
role,1,permission.update,models.song,,1
role,1,permission.delete_user,models.song,,1
role,1,permission.delete_user_self,models.song,,1
role,1,permission.delete_role,models.song,,1
role,1,permission.change_owner,models.song,,1
role,1,model.get,models.song,,1
role,1,model.list,models.song,,1
role,1,model.create,models.song,,1
role,1,model.update,models.song,,1
role,1,model.delete,models.song,,1
public,,column.get,columns.song.audio_filename.read,,1
public,,column.get,columns.song.audio_filename.write,,1
"""
DEFAULT_PERMISSION_PERMISSIONS = ['get', 'add_user', 'add_role', 'delete_user', 'delete_role']
DEFAULT_MODEL_PERMISSIONS = ['get', 'list', 'create', 'update', 'delete']
DEFAULT_COLUMN_PERMISSIONS = ['read', 'write']
DEFAULT_SUBJECTS = ['public', 'logged']
def generate_permissions():
from oshipka.webapp.views import MODEL_VIEWS
with open(os.path.join(STATIC_DATA_DIR, "Permission.csv"), 'w') as f:
f.write("subject,subject_id,action,object,object_id,is_allowed\n")
for permission in DEFAULT_PERMISSION_PERMISSIONS:
f.write("role,1,permission.{},admin.permissions,,1\n".format(permission))
for model, model_view in MODEL_VIEWS.items():
if model in ['permission']:
continue
is_ownable = 'Ownable' in model_view.definitions.get('inherits', [])
subjects = DEFAULT_SUBJECTS + ['owner']if is_ownable else DEFAULT_SUBJECTS
for subject in subjects:
for permission in DEFAULT_PERMISSION_PERMISSIONS:
f.write("{},,permission.{},models.{},,0\n".format(subject, permission, model))
f.write("role,1,permission.{},models.{},,1\n".format(permission, model))
f.write("{},,permission.update,models.{},,0\n".format(subject, model))
f.write("role,1,permission.update,models.{},,1\n".format(subject, model))
f.write("{},,permission.delete_user_self,models.{},,0\n".format(subject, model))
f.write("role,1,permission.delete_user_self,models.{},,1\n".format(subject, model))
if is_ownable:
f.write("{},,permission.change_owner,models.{},,1\n".format(subject, model))
for permission in DEFAULT_MODEL_PERMISSIONS:
f.write("{},,model.{},models.{},,1\n".format(subject, permission, model))
for column in model_view.definitions.get('columns'):
column_name = column.get('name')
for permission in DEFAULT_COLUMN_PERMISSIONS:
f.write("{},,column.{}.{},columns.{},,1\n".format(subject, column_name, permission, model))
def populate_static(app):
print("populating...")
@ -341,10 +394,10 @@ def populate_static(app):
for model_name in ordered_model_names:
if SECURITY_ENABLED and model_name in ['User', 'Role']:
model = eval(model_name)
model_acl = None
else:
if SECURITY_ENABLED and model_name in ['Permission']:
generate_permissions()
model = getattr(models, model_name)
model_acl = getattr(models, model_name + 'Acl')
with open(os.path.join(STATIC_DATA_DIR, "{}.csv".format(model_name))) as f:
if issubclass(model, Ownable):
user = User.query.first()
@ -360,7 +413,7 @@ def populate_static(app):
row_updates[key] = sensitive_value
if row_updates:
row.update(row_updates)
instance = create_model(model, model_acl, user, row)
instance = create_model(model, user, row)
db.session.add(instance)
db.session.commit()
print("Finished populating")
@ -389,8 +442,7 @@ def update_m_ns(instance, m_ns):
setattr(instance, key, children)
def create_model(model, model_acl, user, serialized_args):
from oshipka.webapp.views import create_acls
def create_model(model, user, serialized_args):
m_ns, to_delete = filter_m_n(serialized_args)
for key in to_delete:
del serialized_args[key]
@ -400,6 +452,4 @@ def create_model(model, model_acl, user, serialized_args):
for key, ids in m_ns.items():
m_ns[key] = ids.split(',')
update_m_ns(instance, m_ns)
if model_acl:
create_acls(model_acl, instance, user)
return instance

View File

@ -1,8 +1,10 @@
import urllib
from collections import defaultdict
import requests
from flask import send_from_directory, redirect, request, url_for, session, jsonify, abort, render_template
from flask_login import login_required, current_user
from sqlalchemy import and_
from oshipka.util.strings import random_string_generator
from oshipka.webapp import oshipka_bp, app
@ -29,7 +31,8 @@ def get_media(model_name, instance_id, column, filepath):
if SECURITY_ENABLED:
from flask_security import login_user, roles_required
from oshipka.persistance import User, db
from oshipka.persistance import User, Role, db
from webapp.models import Permission
app.config['SSO_BASE_URL'] = SSO_BASE_URL
@ -102,8 +105,120 @@ if SECURITY_ENABLED:
return response.text
@oshipka_bp.route('/permissions')
def get_permissions_table(query):
permissions = Permission.query.filter(query).all()
permissions_table = dict(
selected_roles=list(),
selected_users=list(),
actions=set(),
table=defaultdict(dict),
)
for p in permissions:
if p.subject in ['user']:
permissions_table['selected_users'].append(p.subject_id)
if p.subject in ['role']:
permissions_table['selected_roles'].append(p.subject_id)
s = "{}_{}".format(p.subject, p.subject_id) if p.subject_id else p.subject
permissions_table["table"][s][p.action] = p.is_allowed
permissions_table["actions"].add(p.action)
permissions_table["actions"] = sorted(permissions_table["actions"])
return permissions_table
@oshipka_bp.route('/permissions', methods=['GET', 'POST'])
@login_required
@roles_required(*['admin'])
def get_permissions():
return render_template('permissions.html', MODEL_VIEWS=MODEL_VIEWS, users=User.query.all())
admin_permissions_table = get_permissions_table(Permission.object == 'admin.permissions')
model_tables = {}
for mv in MODEL_VIEWS:
model_tables[mv] = get_permissions_table(and_(
Permission.action == 'permission.get',
Permission.object == 'models.{}'.format(mv)
))
return render_template('permissions_admin.html',
admin_permissions_table=admin_permissions_table,
model_tables=model_tables,
model_views=[k for k in MODEL_VIEWS.keys() if k not in ['permission']],
users=User.query.all(),
roles=Role.query.all(),
)
def get_model_tables(model_name, instance_id=None):
model_view = MODEL_VIEWS.get(model_name)
if not model_view:
abort(404)
if instance_id:
model_query = and_(Permission.object == 'models.{}'.format(model_name),
Permission.object_id == instance_id)
column_query = and_(Permission.object == 'columns.{}'.format(model_name),
Permission.object_id == instance_id)
else:
model_query = Permission.object == 'models.{}'.format(model_name)
column_query = Permission.object == 'columns.{}'.format(model_name)
model_permissions_table = get_permissions_table(model_query)
column_permissions_table = get_permissions_table(column_query)
return model_permissions_table, column_permissions_table
@app.route("/permissions/<model_name>", methods=['GET', 'POST'])
@login_required
def model_permissions(model_name):
if request.method == "POST":
data = {k: len(request.form.getlist(k)) - 1 for k in request.form.keys()}
for k, v in data.items():
if k in ['csrf_token']:
continue
_, subject, action = k.split('-')
sub_split, subject_id = subject.split('_'), None
if len(sub_split) == 2:
subject, subject_id = sub_split
if action.startswith('model.') or action.startswith('permission.'):
obj = "models.{}"
elif action.startswith('column.'):
obj = "columns.{}"
else:
obj = "{}"
abort(403)
if not subject_id:
existing_permission = Permission.query.filter_by(
subject=subject, action=action, object=obj.format(model_name)).first()
else:
existing_permission = Permission.query.filter_by(
subject=subject, subject_id=subject_id, action=action, object=obj.format(model_name)).first()
if not existing_permission:
existing_permission = Permission(
subject=subject, subject_id=subject_id, action=action, object=obj.format(model_name),
)
existing_permission.is_allowed = v
db.session.add(existing_permission)
db.session.commit()
return redirect(request.referrer)
model_permissions_table, column_permissions_table = get_model_tables(model_name)
return render_template("permissions_model.html",
model_permissions_table=model_permissions_table,
column_permissions_table=column_permissions_table,
model_name=model_name,
users=User.query.all(),
roles=Role.query.all(),
)
@app.route("/permissions/<model_name>/<int:instance_id>", methods=['GET', 'POST'])
@login_required
def instance_permissions(model_name, instance_id):
model_view = MODEL_VIEWS.get(model_name)
if not model_view:
abort(404)
instance = model_view.model.query.filter_by(id=instance_id).first()
if not instance:
abort(404)
model_permissions_table, column_permissions_table = get_model_tables(model_name, instance_id)
return render_template("permissions_instance.html",
model_permissions_table=model_permissions_table,
column_permissions_table=column_permissions_table,
model_name=model_name,
instance=instance,
)

View File

@ -0,0 +1,26 @@
<table>
<tr>
<th></th>
{% for subject in instance_table['table'].keys() %}
<th>{{ subject }}</th>
{% endfor %}
</tr>
{% for action in instance_table['actions'] %}
<tr>
<td>{{ action }}</td>
{% for subject in instance_table['table'].keys() %}
<td>
<input type="hidden"
name="is-{{ subject }}-{{ action }}" value="0" />
<input type="checkbox"
name="is-{{ subject }}-{{ action }}" value="1"
id="input-checkbox-{{ subject }}-{{ action }}"
{% if instance_table['table'][subject][action] %}
checked="checked"
{% endif %}
/>
</td>
{% endfor %}
</tr>
{% endfor %}
</table>

View File

@ -0,0 +1,17 @@
<form method="post">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
{% set selected_users = model_permissions_table['selected_users'] %}
{% set selected_roles = model_permissions_table['selected_roles'] %}
<button type="submit">{{ _("Update") }}</button>
<br><br>
{% include "_users_roles_multiselect.html" %}
<br><br>
{% set instance_table = model_permissions_table %}
{% include "_permission_table.html" %}
<h2>Instance Permissions</h2>
{% set instance_table = column_permissions_table %}
{% include "_permission_table.html" %}
<button type="submit">{{ _("Update") }}</button>
</form>

View File

@ -0,0 +1,20 @@
<label>Users:
<select multiple>
{% for user in users %}
<option value="{{ user.id }}"
{% if user.id in selected_users %}selected="selected"{% endif %}
>user {{ user.id }} ({{ user.username }})
</option>
{% endfor %}
</select>
</label>
<label>Roles:
<select multiple>
{% for role in roles %}
<option value="{{ role.id }}"
{% if role.id in selected_roles %}selected="selected"{% endif %}
>role {{ role.id }}_({{ role.name }})
</option>
{% endfor %}
</select>
</label>

View File

@ -1,12 +0,0 @@
{% extends "layout.html" %}
{% block content %}
<h1>Admin permissions</h1>
<p>{{ _("Who can access the admin permissions page (this one!):") }}</p>
{% include "users_roles_multiselect.html" %}
{% for mv in MODEL_VIEWS %}
<h2><a href="{{ url_for(mv + '_model_permissions') }}">{{ mv }}</a></h2>
<p>{{ _("Who can access the permissions page for") }} {{ mv }}</p>
{% include "users_roles_multiselect.html" %}
{% endfor %}
{% endblock %}

View File

@ -0,0 +1,19 @@
{% extends "layout.html" %}
{% block content %}
<h1>Admin permissions</h1>
{{ admin_permissions }}
<p>{{ _("Who can access the admin permissions page (this one!):") }}</p>
{% set selected_users = admin_permissions_table['selected_users'] %}
{% set selected_roles = admin_permissions_table['selected_roles'] %}
{% include "_users_roles_multiselect.html" %}
{% set instance_table = admin_permissions_table %}
{% include "_permission_table.html" %}
{% for mv in model_views %}
<h2><a href="{{ url_for('model_permissions', model_name=mv) }}">{{ mv }}</a></h2>
<p>{{ _("Who can access the permissions page for") }} {{ mv }}</p>
{% set selected_users = model_tables[mv]['selected_users'] %}
{% set selected_roles = model_tables[mv]['selected_roles'] %}
{% include "_users_roles_multiselect.html" %}
{% endfor %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends "layout.html" %}
{% block content %}
<h2>{{ _("Instance permissions for ") }} {{ model_name }}: {% include model_name + "/_title.html" %} </h2>
<p>{{ _("Who has specific permissions for this instance") }}</p>
{% include "_permissions.html" %}
{% endblock %}

View File

@ -0,0 +1,7 @@
{% extends "layout.html" %}
{% block content %}
<h2>{{ _("Default permissions for") }} {{ model_name }} </h2>
<p>{{ _("Who can access the model pages for") }} {{ model_name }}</p>
{% include "_permissions.html" %}
{% endblock %}

View File

@ -1,16 +0,0 @@
<form>
<label>Users:
<select multiple>
{% for user in users %}
<option value="{{ user.id }}">{{ user.username }}</option>
{% endfor %}
</select>
</label>
<label>Roles:
<select multiple>
{% for user in roles %}
<option value="{{ user.id }}">{{ user.username }}</option>
{% endfor %}
</select>
</label>
</form>

View File

@ -22,27 +22,7 @@ MODEL_VIEWS = dict()
def check_instance_perm(model_view, verb, instance):
model_acl = model_view.model_acl
# Anonymous user -> check public ACL
if current_user.is_anonymous:
instance_acl = model_acl.query.filter_by(instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first()
else:
# Logged in user -> find (user, instance) pair
instance_acl = model_acl.query.filter_by(user=current_user, instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ).first()
if not instance_acl:
# If not (user, instance) pair -> check authN ACL
instance_acl = model_acl.query.filter_by(user=current_user, instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN).first()
if not instance_acl:
# finally, try public
instance_acl = model_acl.query.filter_by(instance=instance,
acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC).first()
if not instance_acl:
return False
column = verb.replace('.', '__')
return getattr(instance_acl, column)
return True
def has_permission(model, verb, instance=None):
@ -51,20 +31,7 @@ def has_permission(model, verb, instance=None):
return False
if '.' in verb:
return check_instance_perm(model_view, verb, instance)
acl = model_view.model.model_acls.get(verb)
# Anonymous user -> do we require AuthN?
if current_user.is_anonymous:
return not acl.get('authn')
# Not Anonymous user -> Check roles
roles = acl.get('authz')
# No roles required -> has permission
if not roles:
return True
# One role is enough to grant permission
for role in roles:
if current_user.has_role(role):
return True
return False
return True
def default_get_args_func(view_context):
@ -144,7 +111,7 @@ def get_filters(serialized_args):
def default_search_func(vc):
q = vc.serialized_args.get('q')
if hasattr(vc.model_view.model, 'search_query'):
query = vc.model_view.model.search_query("*{q}*".format(q=q))
query = vc.model_view.model.search_query("{q}* or *{q}* or {q}*".format(q=q))
filters = get_filters(vc.serialized_args)
filtered_query = apply_filters(query, filters)
per_page = request.args.get('per_page')
@ -175,18 +142,6 @@ def default_create_func(vc):
instance = vc.instances or vc.model_view.model()
vc.instances = [instance]
default_update_func(vc)
user = current_user if not current_user.is_anonymous else None
create_acls(vc.model_view.model_acl, instance, user)
def create_acls(model_acl, instance, user):
instance_public_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_PUBLIC)
db.session.add(instance_public_acl)
instance_authn_acl = model_acl(instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHN)
db.session.add(instance_authn_acl)
if user:
instance_authz_acl = model_acl(user=user, instance=instance, acl_type=SHARING_TYPE_TYPES_TYPE_AUTHZ)
db.session.add(instance_authz_acl)
def default_delete_func(vc):
@ -308,10 +263,10 @@ def create_view(model_view, view_context_kwargs, is_login_required=False, the_ro
class ModelView(object):
def __init__(self, app, model, model_acl=None):
def __init__(self, app, model, definitions=None):
self.app = app
self.model = model
self.model_acl = model_acl
self.definitions = definitions or {}
p = inflect.engine()
if hasattr(model, "__name__"):

View File

@ -1,42 +0,0 @@
<table>
<tr>
<th></th>
<th>{{ _("Public") }}</th>
<th>{{ _("Logged") }}</th>
[%- if 'Ownable' in inherits %]
<th>{{ _("Owner") }}</th>
[%- endif %]
</tr>
{% for verb in ['list', 'get', 'update', 'delete'] %}
<tr>
<td>{{ verb }}</td>
{% for scope in ['public', 'logged'[%- if 'Ownable' in inherits %], 'owner'[%- endif %] ] %}
<td><input type="checkbox" id="[[ name ]]-{{ scope }}-{{ verb }}" checked="checked"/></td>
{% endfor %}
</tr>
{% endfor %}
</table>
<h2>Instance Permissions</h2>
<table>
<tr>
<th></th>
<th>{{ _("Public") }}</th>
<th>{{ _("Logged") }}</th>
[%- if 'Ownable' in inherits %]
<th>{{ _("Owner") }}</th>
[%- endif %]
</tr>
{% for column in [[ columns ]] %}
<tr>
<td>{{ column.name }}</td>
{% for scope in ['public', 'logged'[%- if 'Ownable' in inherits %], 'owner'[%- endif %] ] %}
<td>
<input type="checkbox" id="[[ name ]]-{{ column }}-{{ scope }}-read" checked="checked"/> r
<input type="checkbox" id="[[ name ]]-{{ column }}-{{ scope }}-write" checked="checked"/> w
</td>
{% endfor %}
</tr>
{% endfor %}
</table>

View File

@ -1,9 +0,0 @@
{% extends "layout.html" %}
{% block content %}
<h2>{{ _("Instance permissions for ") }} {{ _("[[ name ]]") }}: {% include "[[ name|camel_to_snake ]]/_title.html" %} </h2>
<p>{{ _("Who has specific permissions for this instance") }}</p>
{% include "users_roles_multiselect.html" %}
<br><br>
{% include "[[ name|camel_to_snake ]]/_permissions.html" %}
{% endblock %}

View File

@ -1,9 +0,0 @@
{% extends "layout.html" %}
{% block content %}
<h2>{{ _("Default permissions for") }} {{ _("[[ name ]]") }} </h2>
<p>{{ _("Who can access the model pages for") }} {{ _("[[ name ]]") }}</p>
{% include "users_roles_multiselect.html" %}
<br><br>
{% include "[[ name|camel_to_snake ]]/_permissions.html" %}
{% endblock %}

View File

@ -1,21 +0,0 @@
from sqlalchemy_utils import ChoiceType
from oshipka.persistance import db, ModelController
from oshipka.persistance import SHARING_TYPE_TYPES, SHARING_TYPE_TYPES_TYPE_PUBLIC
class [[ name ]]Acl(db.Model, ModelController):
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
user = db.relationship('User', backref=db.backref("[[ name|camel_to_snake ]]_acls"))
instance_id = db.Column(db.Integer, db.ForeignKey('[[ name|camel_to_snake ]].id'))
instance = db.relationship('[[ name ]]', backref=db.backref("[[ name|camel_to_snake|pluralize ]]"))
acl_type = db.Column(ChoiceType(SHARING_TYPE_TYPES), default=SHARING_TYPE_TYPES_TYPE_PUBLIC)
_read = db.Column(db.Boolean, default=True)
_update = db.Column(db.Boolean, default=True)
_delete = db.Column(db.Boolean, default=True)
[% for column in columns %]
[[ column.name ]]__read = db.Column(db.Boolean, default=True)
[[ column.name ]]__write = db.Column(db.Boolean, default=True)
[%- endfor %]

View File

@ -10,17 +10,10 @@ from flask_security import login_required
from oshipka.webapp import app
from oshipka.webapp.views import ModelView
from webapp.routes.[[ name|camel_to_snake ]]_hooks import *
[%- if name == "User" %]
from webapp.models import [[ name ]]
[[ name|camel_to_snake ]] = ModelView(app, [[name]])
[%- else %]
from webapp.models import [[ name ]], [[ name ]]Acl
[[ name|camel_to_snake ]] = ModelView(app, [[name]], [[ name ]]Acl)
[%- endif %]
[[ name|camel_to_snake ]] = ModelView(app, [[name]], [[ definitions ]])
[% for verb, verb_values in _verbs.items() %]
@ -31,18 +24,4 @@ from webapp.models import [[ name ]], [[ name ]]Acl
is_login_required=[[ verb_values.is_login_required if verb_values.is_login_required else 'False' ]],
the_roles_required=[[ verb_values.the_roles_required if verb_values.the_roles_required else '[]' ]],
)
[% endfor %]
@app.route("/[[ name|camel_to_snake|pluralize ]]/permissions")
@login_required
def [[ name|camel_to_snake ]]_model_permissions():
return render_template("[[ name|camel_to_snake ]]/permissions_model.html")
@app.route("/[[ name|camel_to_snake|pluralize ]]/<int:instance_id>/permissions")
@login_required
def [[ name|camel_to_snake ]]_instance_permissions(instance_id):
instance = [[ name ]].query.filter_by(id=instance_id).first()
if not instance:
abort(404)
return render_template("[[ name|camel_to_snake ]]/permissions_instance.html", instance=instance)
[% endfor %]

View File

@ -174,23 +174,15 @@ def process_model(view_model):
with open(os.path.join(MODELS_PATH, "_{}.py".format(model_snake)), 'w+') as f:
f.write(model)
if model_camel not in ['User']:
template_acl = env.get_template('model_acl_py')
model_acl = autopep8.fix_code(template_acl.render(**view_model), options=pep_options)
with open(os.path.join(MODELS_PATH, "_{}_acl.py".format(model_snake)), 'w+') as f:
f.write(model_acl)
public_model = os.path.join(MODELS_PATH, "{}.py".format(model_snake))
if not os.path.exists(public_model):
with open(public_model, 'w+') as f:
if model_camel not in ['User']:
f.write("from webapp.models._{}_acl import {}Acl\n".format(model_snake, model_camel))
f.write("from webapp.models._{} import {}\n".format(model_snake, model_camel))
def process_routes(view_model):
template = env.get_template('routes_py')
model = autopep8.fix_code(template.render(**view_model), options=pep_options)
model = autopep8.fix_code(template.render(definitions=view_model, **view_model), options=pep_options)
_model_name = view_model.get('name')
model_name_snake = camel_case_to_snake_case(_model_name.split('.yaml')[0])
filename = "{}.py".format(model_name_snake)
@ -233,11 +225,7 @@ def main():
process_routes(view_model)
view_model_name = view_model.get('name', '')
model_snake_name = camel_case_to_snake_case(view_model_name)
if view_model_name not in ['User']:
all_model_imports.append('from webapp.models.{s} import {c}, {c}Acl'.format(
s=model_snake_name, c=view_model_name))
else:
all_model_imports.append('from webapp.models.{s} import {c}'.format(
all_model_imports.append('from webapp.models.{s} import {c}'.format(
s=model_snake_name, c=view_model_name))
process_html_templates(view_model)
all_route_imports.append('from webapp.routes.{} import *'.format(