oshipka/oshipka/webapp/default_routes.py

90 lines
3.2 KiB
Python

import urllib
import requests
from flask import send_from_directory, redirect, request, url_for, session, jsonify
from flask_security import login_user
from oshipka.util.strings import random_string_generator
from oshipka.webapp import oshipka_bp, app
from config import MEDIA_DIR, APP_BASE_URL, SECURITY_ENABLED, SSO_BASE_URL, SSO_CLIENT_ID
from sensitive import SSO_CLIENT_SECRET
# TODO: VULNZ - EVERYONE HAS ACCESS TO THIS
@oshipka_bp.route('/media/<path:filepath>')
def get_media(filepath):
return send_from_directory(MEDIA_DIR, filepath)
if SECURITY_ENABLED:
from oshipka.persistance import User, db
app.config['SSO_BASE_URL'] = SSO_BASE_URL
SSO_AUTH_URL = '/oidc/auth'
SSO_TOKEN_URL = '/oidc/token'
SSO_USERINFO_URL = "/endpoints/userinfo"
@app.route('/login')
@oshipka_bp.route('/sso')
def sso():
callback_url = APP_BASE_URL + url_for('oshipka_bp.oidc_callback')
state = request.referrer or url_for('home') + "|" + random_string_generator()
session['oidc_state'] = state
params = urllib.parse.urlencode({
'redirect_uri': callback_url,
'client_id': SSO_CLIENT_ID,
'state': state,
'scope': 'openid',
'response_type': 'code',
'nonce': random_string_generator(),
})
return redirect(SSO_BASE_URL + SSO_AUTH_URL + '?' + params)
@oshipka_bp.route('/oidc/callback')
def oidc_callback():
error = request.args.get('error')
if error:
return jsonify({"error": "from auth server: {}".format(error)}), 400
state = request.args.get('state')
session_state = session['oidc_state']
if state != session_state:
return jsonify({"error": "state is different from session state"}), 400
code = request.args.get('code')
response = requests.post(
SSO_BASE_URL + SSO_TOKEN_URL,
data={
'code': code,
'client_id': SSO_CLIENT_ID,
'client_secret': SSO_CLIENT_SECRET,
'grant_type': 'authorization_code'
},
verify=False,
)
if response.status_code == 200:
response_json = response.json()
access_token = response_json.get('access_token')
response = requests.get(
SSO_BASE_URL + SSO_USERINFO_URL,
headers={
'Authorization': "Bearer {}".format(access_token)
},
verify=False,
)
if response.status_code == 200:
response_json = response.json()
username = response_json.get('user', {}).get('username')
user = User.query.filter_by(username=username).first()
redirect_uri = url_for('home')
if not user:
user = User(username=username, token=access_token)
db.session.add(user)
db.session.commit()
login_user(user)
if 'oidc_state' in session:
redirect_uri = session['oidc_state'].split('|')[0] or url_for('home')
del session['oidc_state']
return redirect(redirect_uri)
return response.text