oshipka/vm_gen/vm_gen.py
2021-05-11 00:14:25 +02:00

273 lines
9.9 KiB
Python

import os
import shutil
import sys
import autopep8
import inflect
import yaml
from jinja2 import Environment, FileSystemLoader
from shared.shared import order_from_process_order
from oshipka.util.strings import snake_case_to_camel_case, camel_case_to_snake_case
pep_options = {'max_line_length': 120}
def _process_choice(column):
column_name = column.get('name', '')
column_upper = column_name.upper()
types_name = "{}_TYPES".format(column_upper)
choices = column.get('choices', {})
return {
'name': types_name,
'choices': choices,
}
def process_secondary(view_model, column_name):
model_name = camel_case_to_snake_case(view_model.get('name'))
secondary = "{}__{}".format(model_name, column_name)
return {
'name': secondary,
'columns': [{
'name': model_name
}, {
'name': column_name
}]
}
def _process_file(column):
column_type = column.get('type', '')
column_accept = column.get('accept', '')
if column_accept:
accept = column_accept
elif column_type in ['audio']:
accept = 'audio/*'
elif column_type in ['video']:
accept = 'video/*'
elif column_type in ['video']:
accept = 'video/*'
else:
accept = '*'
column["accept"] = accept
column["is_file"] = True
return column
VERBS = {
'get': {
'per_item': 'True',
'methods': ['GET'],
},
'list': {
'per_item': 'False',
'methods': ['GET'],
},
'table': {
'per_item': 'False',
'methods': ['GET'],
},
'search': {
'per_item': 'False',
'methods': ['GET'],
},
'create': {
'per_item': 'False',
'methods': ['GET', 'POST'],
},
'update': {
'per_item': 'True',
'methods': ['GET', 'POST'],
},
'delete': {
'per_item': 'True',
'methods': ['GET', 'POST'],
},
}
def enrich_view_model(view_model):
columns = []
for column in view_model.get('columns', {}):
column_name = column.get('name')
column_type = column.get('type')
if column_type in ['text', 'long_text', ]:
_column_type = 'db.UnicodeText'
elif column_type in ['password', ]:
_column_type = 'db.UnicodeText'
view_model['_password_types'] = True
elif column_type in ['number', 'int', 'integer', ]:
_column_type = 'db.Integer'
elif column_type in ['bool', 'boolean', ] or column_name.startswith('is_'):
_column_type = 'LiberalBoolean'
elif column_type in ['datetime', ] or column_name.endswith('_dt'):
_column_type = 'db.UnicodeText'
elif column_type in ['relationship', ]:
_column_type = 'relationship'
multiple = column.get('multiple')
if multiple:
if '_secondaries' not in view_model:
view_model['_secondaries'] = []
secondary_model = process_secondary(view_model, column_name)
column.update({'secondary': secondary_model})
view_model['_secondaries'].append(secondary_model)
elif column_type in ['choice', ]:
if '_choice_types' not in view_model:
view_model['_choice_types'] = []
_choices = _process_choice(column)
_column_type = 'ChoiceType({})'.format(_choices.get('name'))
view_model['_choice_types'].append(_choices)
elif column_type in ['file', 'audio', 'video', 'image', 'img', 'picture', ]:
column = _process_file(column)
_column_type = 'db.UnicodeText'.format()
else:
_column_type = 'db.UnicodeText'
column.update({'_type': _column_type})
columns.append(column)
view_model['columns'] = columns
view_model['_verbs'] = {}
for verb, verb_default in VERBS.items():
view_model['_verbs'][verb] = verb_default
access = view_model.get('access')
if access:
for acl in access:
verb = acl.get('verb')
if verb == 'all':
for verb in VERBS:
view_model['_verbs'][verb]['is_login_required'] = acl.get('login_required')
view_model['_verbs'][verb]['the_roles_required'] = acl.get('roles_required', [])
else:
is_login_required = acl.get('login_required')
if is_login_required is False: # overrides all
view_model['_verbs'][verb]['the_roles_required'] = acl.get('login_required')
view_model['_verbs'][verb]['is_login_required'] = is_login_required
view_model['_verbs'][verb]['the_roles_required'] = acl.get('roles_required', [])
return view_model
def process_navigation(view_models):
navigation_items = []
for view_model in view_models:
if view_model.get('show_in_nav'):
navigation_items.append(view_model)
template = env.get_template(os.path.join('html', 'navigation.html'))
rv = template.render(**{"navigation_items": navigation_items})
filepath = os.path.join(HTML_TEMPLATES_PATH, "navigation.html")
with open(filepath, 'w+') as f:
f.write(rv)
def process_model(view_model):
template = env.get_template('model_py')
view_model['acls'] = {}
_model_name = view_model.get('name')
model_camel = _model_name.split('.yaml')[0]
model_snake = camel_case_to_snake_case(_model_name.split('.yaml')[0])
for verb, acl in view_model['_verbs'].items():
view_model['acls'][verb] = {'authn': acl['is_login_required'], 'authz': acl['the_roles_required']}
model = autopep8.fix_code(template.render(**view_model), options=pep_options)
with open(os.path.join(MODELS_PATH, "_{}.py".format(model_snake)), 'w+') as f:
f.write(model)
public_model = os.path.join(MODELS_PATH, "{}.py".format(model_snake))
if not os.path.exists(public_model):
with open(public_model, 'w+') as f:
f.write("from webapp.models._{} import {}\n".format(model_snake, model_camel))
def process_routes(view_model):
template = env.get_template('routes_py')
model = autopep8.fix_code(template.render(definitions=view_model, **view_model), options=pep_options)
_model_name = view_model.get('name')
model_name_snake = camel_case_to_snake_case(_model_name.split('.yaml')[0])
filename = "{}.py".format(model_name_snake)
with open(os.path.join(ROUTES_PATH, filename), 'w+') as f:
f.write(model)
route_hooks_filename = os.path.join(ROUTES_PATH, "{}_hooks.py".format(model_name_snake))
if not os.path.exists(route_hooks_filename):
src_path = os.path.join(VM_TEMPLATES_PATH, "_hooks.py")
shutil.copy(src_path, route_hooks_filename)
def process_html_templates(view_model):
_model_name_snake = camel_case_to_snake_case(view_model.get('name'))
templates_dir = os.path.join(HTML_TEMPLATES_PATH, _model_name_snake)
if not os.path.exists(templates_dir):
os.makedirs(templates_dir)
for filename in os.listdir(os.path.join(VM_TEMPLATES_PATH, "html")):
filepath = os.path.join(templates_dir, filename)
if not filename.startswith("_") and os.path.exists(filepath):
continue
template = env.get_template(os.path.join('html', filename))
rv = template.render(**view_model)
with open(filepath, 'w+') as f:
f.write(rv)
def main():
all_model_imports = ['from oshipka.persistance import db']
all_route_imports, all_view_models = [], []
view_model_names = order_from_process_order('yaml', VIEW_MODELS_PATH)
for view_model_name in view_model_names:
with open(os.path.join(VIEW_MODELS_PATH, "{}.yaml".format(view_model_name)), 'r') as stream:
try:
view_models = yaml.safe_load_all(stream)
for view_model in view_models:
all_view_models.append(view_model)
view_model = enrich_view_model(view_model)
process_model(view_model)
process_routes(view_model)
view_model_name = view_model.get('name', '')
model_snake_name = camel_case_to_snake_case(view_model_name)
all_model_imports.append('from webapp.models.{s} import {c}'.format(
s=model_snake_name, c=view_model_name))
process_html_templates(view_model)
all_route_imports.append('from webapp.routes.{} import *'.format(
model_snake_name
))
except yaml.YAMLError as e:
breakpoint()
process_navigation(all_view_models)
all_model_imports = autopep8.fix_code('\n'.join(all_model_imports), options=pep_options)
all_route_imports = autopep8.fix_code('\n'.join(all_route_imports), options=pep_options)
with open(os.path.join(MODELS_PATH, "__init__.py"), 'w+') as f:
f.write(all_model_imports)
with open(os.path.join(ROUTES_PATH, "__init__.py"), 'w+') as f:
f.write(all_route_imports)
if __name__ == "__main__":
basepath = sys.argv[1]
oshipka_path = os.environ.get('OSHIPKA_PATH')
VM_TEMPLATES_PATH = os.path.join(oshipka_path, "vm_gen", "templates")
WEBAPP_PATH = os.path.join(basepath, "webapp")
VIEW_MODELS_PATH = os.path.join(WEBAPP_PATH, "view_models")
MODELS_PATH = os.path.join(WEBAPP_PATH, "models")
ROUTES_PATH = os.path.join(WEBAPP_PATH, "routes")
for d in [VIEW_MODELS_PATH, MODELS_PATH, ROUTES_PATH, ]:
os.makedirs(d, exist_ok=True)
HTML_TEMPLATES_PATH = os.path.join(WEBAPP_PATH, "templates")
env = Environment(
loader=FileSystemLoader(searchpath=VM_TEMPLATES_PATH),
block_start_string='[%',
block_end_string='%]',
variable_start_string='[[',
variable_end_string=']]'
)
env.filters['snake_to_camel'] = snake_case_to_camel_case
env.filters['camel_to_snake'] = camel_case_to_snake_case
p = inflect.engine()
env.filters['pluralize'] = p.plural
main()