265 lines
9.6 KiB
Python
265 lines
9.6 KiB
Python
import os
|
|
import shutil
|
|
import sys
|
|
|
|
import autopep8
|
|
import inflect
|
|
import yaml
|
|
from jinja2 import Environment, FileSystemLoader
|
|
|
|
from shared.shared import order_from_process_order
|
|
from oshipka.util.strings import snake_case_to_camel_case, camel_case_to_snake_case
|
|
|
|
pep_options = {'max_line_length': 120}
|
|
|
|
|
|
def _process_choice(column):
|
|
column_name = column.get('name', '')
|
|
column_upper = column_name.upper()
|
|
types_name = "{}_TYPES".format(column_upper)
|
|
choices = column.get('choices', {})
|
|
return {
|
|
'name': types_name,
|
|
'choices': choices,
|
|
}
|
|
|
|
|
|
def process_secondary(view_model, column_name):
|
|
model_name = camel_case_to_snake_case(view_model.get('name'))
|
|
secondary = "{}__{}".format(model_name, column_name)
|
|
return {
|
|
'name': secondary,
|
|
'columns': [{
|
|
'name': model_name
|
|
}, {
|
|
'name': column_name
|
|
}]
|
|
}
|
|
|
|
|
|
def _process_file(column):
|
|
column_type = column.get('type', '')
|
|
column_accept = column.get('accept', '')
|
|
if column_accept:
|
|
accept = column_accept
|
|
elif column_type in ['audio']:
|
|
accept = 'audio/*'
|
|
elif column_type in ['video']:
|
|
accept = 'video/*'
|
|
elif column_type in ['video']:
|
|
accept = 'video/*'
|
|
else:
|
|
accept = '*'
|
|
column["accept"] = accept
|
|
column["is_file"] = True
|
|
return column
|
|
|
|
|
|
VERBS = {
|
|
'get': {
|
|
'per_item': 'True',
|
|
'methods': ['GET'],
|
|
},
|
|
'list': {
|
|
'per_item': 'False',
|
|
'methods': ['GET'],
|
|
},
|
|
'table': {
|
|
'per_item': 'False',
|
|
'methods': ['GET'],
|
|
},
|
|
'search': {
|
|
'per_item': 'False',
|
|
'methods': ['GET'],
|
|
},
|
|
'create': {
|
|
'per_item': 'False',
|
|
'methods': ['GET', 'POST'],
|
|
},
|
|
'update': {
|
|
'per_item': 'True',
|
|
'methods': ['GET', 'POST'],
|
|
},
|
|
'delete': {
|
|
'per_item': 'True',
|
|
'methods': ['GET', 'POST'],
|
|
},
|
|
}
|
|
|
|
|
|
def enrich_view_model(view_model):
|
|
columns = []
|
|
for column in view_model.get('columns', {}):
|
|
column_name = column.get('name')
|
|
column_type = column.get('type')
|
|
if column_type in ['text', 'long_text', ]:
|
|
_column_type = 'db.UnicodeText'
|
|
elif column_type in ['number', 'int', 'integer', ]:
|
|
_column_type = 'db.Integer'
|
|
elif column_type in ['bool', 'boolean', ] or column_name.startswith('is_'):
|
|
_column_type = 'LiberalBoolean'
|
|
elif column_type in ['datetime', ] or column_name.endswith('_dt'):
|
|
_column_type = 'db.UnicodeText'
|
|
elif column_type in ['relationship', ]:
|
|
_column_type = 'relationship'
|
|
multiple = column.get('multiple')
|
|
if multiple:
|
|
if '_secondaries' not in view_model:
|
|
view_model['_secondaries'] = []
|
|
secondary_model = process_secondary(view_model, column_name)
|
|
column.update({'secondary': secondary_model})
|
|
view_model['_secondaries'].append(secondary_model)
|
|
elif column_type in ['choice', ]:
|
|
if '_choice_types' not in view_model:
|
|
view_model['_choice_types'] = []
|
|
_choices = _process_choice(column)
|
|
_column_type = 'ChoiceType({})'.format(_choices.get('name'))
|
|
view_model['_choice_types'].append(_choices)
|
|
elif column_type in ['file', 'audio', 'video', 'image', 'img', 'picture', ]:
|
|
column = _process_file(column)
|
|
_column_type = 'db.UnicodeText'.format()
|
|
else:
|
|
_column_type = 'db.UnicodeText'
|
|
column.update({'_type': _column_type})
|
|
columns.append(column)
|
|
view_model['columns'] = columns
|
|
|
|
view_model['_verbs'] = {}
|
|
for verb, verb_default in VERBS.items():
|
|
view_model['_verbs'][verb] = verb_default
|
|
|
|
access = view_model.get('access')
|
|
if access:
|
|
for acl in access:
|
|
verb = acl.get('verb')
|
|
if verb == 'all':
|
|
for verb in VERBS:
|
|
view_model['_verbs'][verb]['is_login_required'] = acl.get('login_required')
|
|
view_model['_verbs'][verb]['the_roles_required'] = acl.get('roles_required')
|
|
else:
|
|
is_login_required = acl.get('login_required')
|
|
if is_login_required is False: # overrides all
|
|
view_model['_verbs'][verb]['the_roles_required'] = acl.get('login_required')
|
|
view_model['_verbs'][verb]['is_login_required'] = is_login_required
|
|
view_model['_verbs'][verb]['the_roles_required'] = acl.get('roles_required')
|
|
return view_model
|
|
|
|
|
|
def process_navigation(view_models):
|
|
navigation_items = []
|
|
for view_model in view_models:
|
|
if view_model.get('show_in_nav'):
|
|
navigation_items.append(view_model)
|
|
template = env.get_template(os.path.join('html', 'navigation.html'))
|
|
rv = template.render(**{"navigation_items": navigation_items})
|
|
filepath = os.path.join(HTML_TEMPLATES_PATH, "navigation.html")
|
|
with open(filepath, 'w+') as f:
|
|
f.write(rv)
|
|
|
|
|
|
def process_model(view_model):
|
|
template = env.get_template('model_py')
|
|
model = autopep8.fix_code(template.render(**view_model), options=pep_options)
|
|
_model_name = view_model.get('name')
|
|
|
|
model_camel = _model_name.split('.yaml')[0]
|
|
model_snake = camel_case_to_snake_case(_model_name.split('.yaml')[0])
|
|
with open(os.path.join(MODELS_PATH, "_{}.py".format(model_snake)), 'w+') as f:
|
|
f.write(model)
|
|
public_model = os.path.join(MODELS_PATH, "{}.py".format(model_snake))
|
|
if not os.path.exists(public_model):
|
|
with open(public_model, 'w+') as f:
|
|
f.write("from webapp.models._{} import {}".format(model_snake, model_camel))
|
|
|
|
|
|
def process_routes(view_model):
|
|
template = env.get_template('routes_py')
|
|
model = autopep8.fix_code(template.render(**view_model), options=pep_options)
|
|
_model_name = view_model.get('name')
|
|
model_name_snake = camel_case_to_snake_case(_model_name.split('.yaml')[0])
|
|
filename = "{}.py".format(model_name_snake)
|
|
with open(os.path.join(ROUTES_PATH, filename), 'w+') as f:
|
|
f.write(model)
|
|
route_hooks_filename = os.path.join(ROUTES_PATH, "{}_hooks.py".format(model_name_snake))
|
|
if not os.path.exists(route_hooks_filename):
|
|
src_path = os.path.join(VM_TEMPLATES_PATH, "_hooks.py")
|
|
shutil.copy(src_path, route_hooks_filename)
|
|
|
|
|
|
def process_html_templates(view_model):
|
|
_model_name_snake = camel_case_to_snake_case(view_model.get('name'))
|
|
templates_dir = os.path.join(HTML_TEMPLATES_PATH, _model_name_snake)
|
|
if not os.path.exists(templates_dir):
|
|
os.makedirs(templates_dir)
|
|
|
|
for filename in os.listdir(os.path.join(VM_TEMPLATES_PATH, "html")):
|
|
filepath = os.path.join(templates_dir, filename)
|
|
if not filename.startswith("_") and os.path.exists(filepath):
|
|
continue
|
|
template = env.get_template(os.path.join('html', filename))
|
|
rv = template.render(**view_model)
|
|
with open(filepath, 'w+') as f:
|
|
f.write(rv)
|
|
|
|
|
|
def main():
|
|
all_model_imports = ['from oshipka.persistance import db']
|
|
all_route_imports, all_view_models = [], []
|
|
view_model_names = order_from_process_order('yaml', VIEW_MODELS_PATH)
|
|
for view_model_name in view_model_names:
|
|
with open(os.path.join(VIEW_MODELS_PATH, "{}.yaml".format(view_model_name)), 'r') as stream:
|
|
try:
|
|
view_models = yaml.safe_load_all(stream)
|
|
for view_model in view_models:
|
|
all_view_models.append(view_model)
|
|
view_model = enrich_view_model(view_model)
|
|
process_model(view_model)
|
|
process_routes(view_model)
|
|
view_model_name = view_model.get('name', '')
|
|
model_snake_name = camel_case_to_snake_case(view_model_name)
|
|
all_model_imports.append('from webapp.models.{} import {}'.format(
|
|
model_snake_name, view_model_name))
|
|
process_html_templates(view_model)
|
|
all_route_imports.append('from webapp.routes.{} import *'.format(
|
|
model_snake_name
|
|
))
|
|
except yaml.YAMLError as e:
|
|
breakpoint()
|
|
process_navigation(all_view_models)
|
|
all_model_imports = autopep8.fix_code('\n'.join(all_model_imports), options=pep_options)
|
|
all_route_imports = autopep8.fix_code('\n'.join(all_route_imports), options=pep_options)
|
|
with open(os.path.join(MODELS_PATH, "__init__.py"), 'w+') as f:
|
|
f.write(all_model_imports)
|
|
with open(os.path.join(ROUTES_PATH, "__init__.py"), 'w+') as f:
|
|
f.write(all_route_imports)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
basepath = sys.argv[1]
|
|
oshipka_path = os.environ.get('OSHIPKA_PATH')
|
|
|
|
VM_TEMPLATES_PATH = os.path.join(oshipka_path, "vm_gen", "templates")
|
|
|
|
WEBAPP_PATH = os.path.join(basepath, "webapp")
|
|
VIEW_MODELS_PATH = os.path.join(WEBAPP_PATH, "view_models")
|
|
MODELS_PATH = os.path.join(WEBAPP_PATH, "models")
|
|
ROUTES_PATH = os.path.join(WEBAPP_PATH, "routes")
|
|
for d in [VIEW_MODELS_PATH, MODELS_PATH, ROUTES_PATH, ]:
|
|
os.makedirs(d, exist_ok=True)
|
|
HTML_TEMPLATES_PATH = os.path.join(WEBAPP_PATH, "templates")
|
|
|
|
env = Environment(
|
|
loader=FileSystemLoader(searchpath=VM_TEMPLATES_PATH),
|
|
block_start_string='[%',
|
|
block_end_string='%]',
|
|
variable_start_string='[[',
|
|
variable_end_string=']]'
|
|
)
|
|
env.filters['snake_to_camel'] = snake_case_to_camel_case
|
|
env.filters['camel_to_snake'] = camel_case_to_snake_case
|
|
|
|
p = inflect.engine()
|
|
env.filters['pluralize'] = p.plural
|
|
|
|
main()
|