import os import shutil import sys import autopep8 import inflect import yaml from jinja2 import Environment, FileSystemLoader from shared.shared import order_from_process_order from oshipka.util.strings import snake_case_to_camel_case, camel_case_to_snake_case pep_options = {'max_line_length': 120} def _process_choice(column): column_name = column.get('name', '') column_upper = column_name.upper() types_name = "{}_TYPES".format(column_upper) choices = column.get('choices', {}) return { 'name': types_name, 'choices': choices, } def process_secondary(view_model, column_name): model_name = camel_case_to_snake_case(view_model.get('name')) secondary = "{}__{}".format(model_name, column_name) return { 'name': secondary, 'columns': [{ 'name': model_name }, { 'name': column_name }] } def _process_file(column): column_type = column.get('type', '') column_accept = column.get('accept', '') if column_accept: accept = column_accept elif column_type in ['audio']: accept = 'audio/*' elif column_type in ['video']: accept = 'video/*' elif column_type in ['video']: accept = 'video/*' else: accept = '*' column["accept"] = accept column["is_file"] = True return column VERBS = { 'get': { 'per_item': 'True', 'methods': ['GET'], }, 'list': { 'per_item': 'False', 'methods': ['GET'], }, 'table': { 'per_item': 'False', 'methods': ['GET'], }, 'search': { 'per_item': 'False', 'methods': ['GET'], }, 'create': { 'per_item': 'False', 'methods': ['GET', 'POST'], }, 'update': { 'per_item': 'True', 'methods': ['GET', 'POST'], }, 'delete': { 'per_item': 'True', 'methods': ['GET', 'POST'], }, } def enrich_view_model(view_model): columns = [] for column in view_model.get('columns', {}): column_name = column.get('name') column_type = column.get('type') if column_type in ['text', 'long_text', ]: _column_type = 'db.UnicodeText' elif column_type in ['number', 'int', 'integer', ]: _column_type = 'db.Integer' elif column_type in ['bool', 'boolean', ] or column_name.startswith('is_'): _column_type = 'LiberalBoolean' elif column_type in ['datetime', ] or column_name.endswith('_dt'): _column_type = 'db.UnicodeText' elif column_type in ['relationship', ]: _column_type = 'relationship' multiple = column.get('multiple') if multiple: if '_secondaries' not in view_model: view_model['_secondaries'] = [] secondary_model = process_secondary(view_model, column_name) column.update({'secondary': secondary_model}) view_model['_secondaries'].append(secondary_model) elif column_type in ['choice', ]: if '_choice_types' not in view_model: view_model['_choice_types'] = [] _choices = _process_choice(column) _column_type = 'ChoiceType({})'.format(_choices.get('name')) view_model['_choice_types'].append(_choices) elif column_type in ['file', 'audio', 'video', 'image', 'img', 'picture', ]: column = _process_file(column) _column_type = 'db.UnicodeText'.format() else: _column_type = 'db.UnicodeText' column.update({'_type': _column_type}) columns.append(column) view_model['columns'] = columns view_model['_verbs'] = {} for verb, verb_default in VERBS.items(): view_model['_verbs'][verb] = verb_default access = view_model.get('access') if access: for acl in access: verb = acl.get('verb') if verb == 'all': for verb in VERBS: view_model['_verbs'][verb]['is_login_required'] = acl.get('login_required') view_model['_verbs'][verb]['the_roles_required'] = acl.get('roles_required', []) else: is_login_required = acl.get('login_required') if is_login_required is False: # overrides all view_model['_verbs'][verb]['the_roles_required'] = acl.get('login_required') view_model['_verbs'][verb]['is_login_required'] = is_login_required view_model['_verbs'][verb]['the_roles_required'] = acl.get('roles_required', []) return view_model def process_navigation(view_models): navigation_items = [] for view_model in view_models: if view_model.get('show_in_nav'): navigation_items.append(view_model) template = env.get_template(os.path.join('html', 'navigation.html')) rv = template.render(**{"navigation_items": navigation_items}) filepath = os.path.join(HTML_TEMPLATES_PATH, "navigation.html") with open(filepath, 'w+') as f: f.write(rv) def process_model(view_model): template = env.get_template('model_py') view_model['acls'] = {} for verb, acl in view_model['_verbs'].items(): view_model['acls'][verb] = {'authn': acl['is_login_required'], 'authz': acl['the_roles_required']} model = autopep8.fix_code(template.render(**view_model), options=pep_options) _model_name = view_model.get('name') model_camel = _model_name.split('.yaml')[0] model_snake = camel_case_to_snake_case(_model_name.split('.yaml')[0]) with open(os.path.join(MODELS_PATH, "_{}.py".format(model_snake)), 'w+') as f: f.write(model) public_model = os.path.join(MODELS_PATH, "{}.py".format(model_snake)) if not os.path.exists(public_model): with open(public_model, 'w+') as f: f.write("from webapp.models._{} import {}".format(model_snake, model_camel)) def process_routes(view_model): template = env.get_template('routes_py') model = autopep8.fix_code(template.render(**view_model), options=pep_options) _model_name = view_model.get('name') model_name_snake = camel_case_to_snake_case(_model_name.split('.yaml')[0]) filename = "{}.py".format(model_name_snake) with open(os.path.join(ROUTES_PATH, filename), 'w+') as f: f.write(model) route_hooks_filename = os.path.join(ROUTES_PATH, "{}_hooks.py".format(model_name_snake)) if not os.path.exists(route_hooks_filename): src_path = os.path.join(VM_TEMPLATES_PATH, "_hooks.py") shutil.copy(src_path, route_hooks_filename) def process_html_templates(view_model): _model_name_snake = camel_case_to_snake_case(view_model.get('name')) templates_dir = os.path.join(HTML_TEMPLATES_PATH, _model_name_snake) if not os.path.exists(templates_dir): os.makedirs(templates_dir) for filename in os.listdir(os.path.join(VM_TEMPLATES_PATH, "html")): filepath = os.path.join(templates_dir, filename) if not filename.startswith("_") and os.path.exists(filepath): continue template = env.get_template(os.path.join('html', filename)) rv = template.render(**view_model) with open(filepath, 'w+') as f: f.write(rv) def main(): all_model_imports = ['from oshipka.persistance import db'] all_route_imports, all_view_models = [], [] view_model_names = order_from_process_order('yaml', VIEW_MODELS_PATH) for view_model_name in view_model_names: with open(os.path.join(VIEW_MODELS_PATH, "{}.yaml".format(view_model_name)), 'r') as stream: try: view_models = yaml.safe_load_all(stream) for view_model in view_models: all_view_models.append(view_model) view_model = enrich_view_model(view_model) process_model(view_model) process_routes(view_model) view_model_name = view_model.get('name', '') model_snake_name = camel_case_to_snake_case(view_model_name) all_model_imports.append('from webapp.models.{} import {}'.format( model_snake_name, view_model_name)) process_html_templates(view_model) all_route_imports.append('from webapp.routes.{} import *'.format( model_snake_name )) except yaml.YAMLError as e: breakpoint() process_navigation(all_view_models) all_model_imports = autopep8.fix_code('\n'.join(all_model_imports), options=pep_options) all_route_imports = autopep8.fix_code('\n'.join(all_route_imports), options=pep_options) with open(os.path.join(MODELS_PATH, "__init__.py"), 'w+') as f: f.write(all_model_imports) with open(os.path.join(ROUTES_PATH, "__init__.py"), 'w+') as f: f.write(all_route_imports) if __name__ == "__main__": basepath = sys.argv[1] oshipka_path = os.environ.get('OSHIPKA_PATH') VM_TEMPLATES_PATH = os.path.join(oshipka_path, "vm_gen", "templates") WEBAPP_PATH = os.path.join(basepath, "webapp") VIEW_MODELS_PATH = os.path.join(WEBAPP_PATH, "view_models") MODELS_PATH = os.path.join(WEBAPP_PATH, "models") ROUTES_PATH = os.path.join(WEBAPP_PATH, "routes") for d in [VIEW_MODELS_PATH, MODELS_PATH, ROUTES_PATH, ]: os.makedirs(d, exist_ok=True) HTML_TEMPLATES_PATH = os.path.join(WEBAPP_PATH, "templates") env = Environment( loader=FileSystemLoader(searchpath=VM_TEMPLATES_PATH), block_start_string='[%', block_end_string='%]', variable_start_string='[[', variable_end_string=']]' ) env.filters['snake_to_camel'] = snake_case_to_camel_case env.filters['camel_to_snake'] = camel_case_to_snake_case p = inflect.engine() env.filters['pluralize'] = p.plural main()