security conditional

This commit is contained in:
Daniel Tsvetkov 2021-05-09 14:44:11 +02:00
parent 517bdc32fe
commit b1b7b4017b
7 changed files with 131 additions and 116 deletions

View File

@ -25,3 +25,4 @@ MAKEDIRS = [
]
APP_BASE_URL = "http://localhost:5000"
SECURITY_ENABLED = True

View File

@ -27,7 +27,7 @@ from tww.lib import solve_query, resolve_timezone, dt_tz_translation, time_ago
from whooshalchemy import IndexService
from config import SQLALCHEMY_DATABASE_URI, MAKEDIRS, DATABASE_FILE, SEARCH_INDEX_PATH, STATIC_DATA_DIR, MEDIA_DIR, \
basepath
basepath, SECURITY_ENABLED
from oshipka.util.strings import camel_case_to_snake_case
from vm_gen.vm_gen import order_from_process_order
@ -61,11 +61,6 @@ class Datable(object):
updated_dt = db.Column(db.UnicodeText())
roles_users = db.Table('roles_users',
db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))
class ModelJsonEncoder(JSONEncoder):
def default(self, o):
if isinstance(o, datetime.datetime):
@ -168,30 +163,36 @@ class ModelController(ModelJsonEncoder):
return fields
class Role(db.Model, ModelController, RoleMixin):
name = db.Column(db.Unicode, unique=True)
description = db.Column(db.Unicode)
if SECURITY_ENABLED:
roles_users = db.Table('roles_users',
db.Column('user_id', db.Integer(), db.ForeignKey('user.id')),
db.Column('role_id', db.Integer(), db.ForeignKey('role.id')))
class User(db.Model, ModelController, UserMixin):
username = db.Column(db.Unicode, unique=True)
token = db.Column(db.Unicode)
active = db.Column(db.Boolean(), default=True)
timezone = db.Column(db.String, default='UTC')
tz_offset_seconds = db.Column(db.Integer, default=0)
locale = db.Column(db.String(4), default='en')
name = db.Column(db.Unicode)
profile_image_url = db.Column(db.String)
roles = db.relationship('Role', secondary=roles_users,
backref=db.backref('users', lazy='dynamic'))
class Role(db.Model, ModelController, RoleMixin):
name = db.Column(db.Unicode, unique=True)
description = db.Column(db.Unicode)
security = Security()
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
class User(db.Model, ModelController, UserMixin):
username = db.Column(db.Unicode, unique=True)
token = db.Column(db.Unicode)
active = db.Column(db.Boolean(), default=True)
timezone = db.Column(db.String, default='UTC')
tz_offset_seconds = db.Column(db.Integer, default=0)
locale = db.Column(db.String(4), default='en')
name = db.Column(db.Unicode)
profile_image_url = db.Column(db.String)
roles = db.relationship('Role', secondary=roles_users,
backref=db.backref('users', lazy='dynamic'))
security = Security()
user_datastore = SQLAlchemyUserDatastore(db, User, Role)
def register_filters(app):
@ -337,7 +338,8 @@ def init_db(app):
db.init_app(app)
csrf.init_app(app)
migrate.init_app(app, db)
security.init_app(app, user_datastore)
if SECURITY_ENABLED:
security.init_app(app, user_datastore)
_init_translations(app)
register_filters(app)
@ -370,7 +372,7 @@ def populate_static(app):
sensitive = import_module("sensitive")
ordered_model_names = order_from_process_order('csv', STATIC_DATA_DIR)
for model_name in ordered_model_names:
if model_name in ['User', 'Role']:
if SECURITY_ENABLED and model_name in ['User', 'Role']:
model = eval(model_name)
else:
model = getattr(models, model_name)
@ -385,18 +387,8 @@ def populate_static(app):
row_updates[key] = sensitive_value
if row_updates:
row.update(row_updates)
if model_name == "User":
if 'role_names' in row:
role_names = row.pop('role_names')
else:
role_names = ""
user = user_datastore.create_user(**row)
for role_name in role_names.split(';'):
role = Role.query.filter_by(name=role_name).first()
user_datastore.add_role_to_user(user, role)
else:
instance = create_model(model, row)
db.session.add(instance)
instance = create_model(model, row)
db.session.add(instance)
db.session.commit()
print("Finished populating")

View File

@ -4,10 +4,9 @@ import requests
from flask import send_from_directory, redirect, request, url_for, session, jsonify
from flask_security import login_user
from oshipka.persistance import User, db
from oshipka.util.strings import random_string_generator
from oshipka.webapp import oshipka_bp, app
from config import MEDIA_DIR, APP_BASE_URL
from config import MEDIA_DIR, APP_BASE_URL, SECURITY_ENABLED
from sensitive import SSO_CLIENT_ID, SSO_CLIENT_SECRET
@ -16,70 +15,71 @@ from sensitive import SSO_CLIENT_ID, SSO_CLIENT_SECRET
def get_media(filepath):
return send_from_directory(MEDIA_DIR, filepath)
SSO_BASE_URL = 'http://sso.localhost:5008'
SSO_AUTH_URL = '/oidc/auth'
SSO_TOKEN_URL = '/oidc/token'
SSO_USERINFO_URL = "/endpoints/userinfo"
if SECURITY_ENABLED:
from oshipka.persistance import User, db
SSO_BASE_URL = 'http://sso.localhost:5008'
SSO_AUTH_URL = '/oidc/auth'
SSO_TOKEN_URL = '/oidc/token'
SSO_USERINFO_URL = "/endpoints/userinfo"
@app.route('/login')
@oshipka_bp.route('/sso')
def sso():
callback_url = APP_BASE_URL + url_for('oshipka_bp.oidc_callback')
state = request.referrer or url_for('home')
session['oidc_state'] = state
params = urllib.parse.urlencode({
'redirect_uri': callback_url,
'client_id': SSO_CLIENT_ID,
'state': state,
'scope': 'openid',
'response_type': 'code',
'nonce': random_string_generator(),
})
return redirect(SSO_BASE_URL + SSO_AUTH_URL + '?' + params)
@oshipka_bp.route('/oidc/callback')
def oidc_callback():
error = request.args.get('error')
if error:
return jsonify({"error": "from auth server: {}".format(error)}), 400
state = request.args.get('state')
session_state = session['oidc_state']
if state != session_state:
return jsonify({"error": "state is different from session state"}), 400
code = request.args.get('code')
response = requests.post(
SSO_BASE_URL + SSO_TOKEN_URL,
data={
'code': code,
@app.route('/login')
@oshipka_bp.route('/sso')
def sso():
callback_url = APP_BASE_URL + url_for('oshipka_bp.oidc_callback')
state = request.referrer or url_for('home')
session['oidc_state'] = state
params = urllib.parse.urlencode({
'redirect_uri': callback_url,
'client_id': SSO_CLIENT_ID,
'client_secret': SSO_CLIENT_SECRET,
'grant_type': 'authorization_code'
},
)
if response.status_code == 200:
response_json = response.json()
access_token = response_json.get('access_token')
response = requests.get(
SSO_BASE_URL + SSO_USERINFO_URL,
headers={
'Authorization': "Bearer {}".format(access_token)
'state': state,
'scope': 'openid',
'response_type': 'code',
'nonce': random_string_generator(),
})
return redirect(SSO_BASE_URL + SSO_AUTH_URL + '?' + params)
@oshipka_bp.route('/oidc/callback')
def oidc_callback():
error = request.args.get('error')
if error:
return jsonify({"error": "from auth server: {}".format(error)}), 400
state = request.args.get('state')
session_state = session['oidc_state']
if state != session_state:
return jsonify({"error": "state is different from session state"}), 400
code = request.args.get('code')
response = requests.post(
SSO_BASE_URL + SSO_TOKEN_URL,
data={
'code': code,
'client_id': SSO_CLIENT_ID,
'client_secret': SSO_CLIENT_SECRET,
'grant_type': 'authorization_code'
},
)
if response.status_code == 200:
response_json = response.json()
username = response_json.get('user', {}).get('username')
user = User.query.filter_by(username=username).first()
redirect_uri = url_for('home')
if not user:
user = User(username=username)
db.session.add(user)
db.session.commit()
login_user(user)
if 'oidc_state' in session:
redirect_uri = session['oidc_state']
del session['oidc_state']
return redirect(redirect_uri)
return response.json()
access_token = response_json.get('access_token')
response = requests.get(
SSO_BASE_URL + SSO_USERINFO_URL,
headers={
'Authorization': "Bearer {}".format(access_token)
},
)
if response.status_code == 200:
response_json = response.json()
username = response_json.get('user', {}).get('username')
user = User.query.filter_by(username=username).first()
redirect_uri = url_for('home')
if not user:
user = User(username=username)
db.session.add(user)
db.session.commit()
login_user(user)
if 'oidc_state' in session:
redirect_uri = session['oidc_state']
del session['oidc_state']
return redirect(redirect_uri)
return response.json()

View File

@ -257,7 +257,7 @@ def create_view(model_view, view_context_kwargs, is_login_required=False, the_ro
class ModelView(object):
def __init__(self, app, model, model_acl):
def __init__(self, app, model, model_acl=None):
self.app = app
self.model = model
self.model_acl = model_acl

View File

@ -6,7 +6,11 @@ from sqlalchemy_utils import ChoiceType
from werkzeug.security import generate_password_hash
[%- endif %]
class [[ name ]](db.Model, ModelController[% for inherit in interits %], [[ inherit ]][% endfor %]):
[% for inherit_import in inherits_imports %]
[[ inherit_import ]]
[% endfor %]
class [[ name ]](db.Model, ModelController[% for inherit in inherits %], [[ inherit ]][% endfor %]):
[%- include "_model_choice_header_py" %]
[%- include "_model_searchable_header_py" %]

View File

@ -6,10 +6,20 @@ Edit the hooks in webapp/routes/[[ name|camel_to_snake ]]_hooks.py instead
from oshipka.webapp import app
from oshipka.webapp.views import ModelView
from webapp.models import [[ name ]], [[ name ]]Acl
from webapp.routes.[[ name|camel_to_snake ]]_hooks import *
[%- if name == "User" %]
from webapp.models import [[ name ]]
[[ name|camel_to_snake ]] = ModelView(app, [[name]])
[%- else %]
from webapp.models import [[ name ]], [[ name ]]Acl
[[ name|camel_to_snake ]] = ModelView(app, [[name]], [[ name ]]Acl)
[%- endif %]
[% for verb, verb_values in _verbs.items() %]
[[ name|camel_to_snake ]].register_verb(view_context=[[ verb ]]_view_context,
verb="[[ verb ]]",

View File

@ -163,24 +163,28 @@ def process_navigation(view_models):
def process_model(view_model):
template = env.get_template('model_py')
view_model['acls'] = {}
for verb, acl in view_model['_verbs'].items():
view_model['acls'][verb] = {'authn': acl['is_login_required'], 'authz': acl['the_roles_required']}
model = autopep8.fix_code(template.render(**view_model), options=pep_options)
_model_name = view_model.get('name')
template_acl = env.get_template('model_acl_py')
model_acl = autopep8.fix_code(template_acl.render(**view_model), options=pep_options)
model_camel = _model_name.split('.yaml')[0]
model_snake = camel_case_to_snake_case(_model_name.split('.yaml')[0])
for verb, acl in view_model['_verbs'].items():
view_model['acls'][verb] = {'authn': acl['is_login_required'], 'authz': acl['the_roles_required']}
model = autopep8.fix_code(template.render(**view_model), options=pep_options)
with open(os.path.join(MODELS_PATH, "_{}.py".format(model_snake)), 'w+') as f:
f.write(model)
with open(os.path.join(MODELS_PATH, "_{}_acl.py".format(model_snake)), 'w+') as f:
f.write(model_acl)
if model_camel not in ['User']:
template_acl = env.get_template('model_acl_py')
model_acl = autopep8.fix_code(template_acl.render(**view_model), options=pep_options)
with open(os.path.join(MODELS_PATH, "_{}_acl.py".format(model_snake)), 'w+') as f:
f.write(model_acl)
public_model = os.path.join(MODELS_PATH, "{}.py".format(model_snake))
if not os.path.exists(public_model):
with open(public_model, 'w+') as f:
f.write("from webapp.models._{}_acl import {}Acl\n".format(model_snake, model_camel))
if model_camel not in ['User']:
f.write("from webapp.models._{}_acl import {}Acl\n".format(model_snake, model_camel))
f.write("from webapp.models._{} import {}\n".format(model_snake, model_camel))
@ -229,8 +233,12 @@ def main():
process_routes(view_model)
view_model_name = view_model.get('name', '')
model_snake_name = camel_case_to_snake_case(view_model_name)
all_model_imports.append('from webapp.models.{s} import {c}, {c}Acl'.format(
s=model_snake_name, c=view_model_name))
if view_model_name not in ['User']:
all_model_imports.append('from webapp.models.{s} import {c}, {c}Acl'.format(
s=model_snake_name, c=view_model_name))
else:
all_model_imports.append('from webapp.models.{s} import {c}'.format(
s=model_snake_name, c=view_model_name))
process_html_templates(view_model)
all_route_imports.append('from webapp.routes.{} import *'.format(
model_snake_name