1
0
angie-conv-image/j2cfg/j2cfg/__init__.py
2024-09-17 14:11:00 +03:00

267 lines
9.1 KiB
Python

import importlib
import json
import os
import os.path
import sys
import jinja2
import wcmatch.wcmatch
import yaml
from .functions import *
from .settings import *
J2CFG_CONFIG_EXT = ['yml', 'yaml', 'json']
class J2cfg:
def __init__(self, strict=True, config_file=None, config_path=None,
modules=None, search_path=None, template_suffix=None,
dump_only=False):
if dump_only is None:
self.dump_only = False
else:
self.dump_only = bool(dump_only)
self.config_file = config_file or os.getenv('J2CFG_CONFIG')
if self.config_file is not None:
self.config_file = str(self.config_file)
self.config_path = config_path
if self.config_path is None:
self.config_path = os.getenv('J2CFG_PATH')
if self.config_path is not None:
self.config_path = str_split_to_list(self.config_path, ':')
if self.config_path is None:
self.config_path = J2CFG_PATH.copy()
else:
self.config_path = any_to_str_list(self.config_path)
self.config_path = uniq_str_list(self.config_path)
self.kwargs = {'j2cfg': {}}
def merge_dict_from_file(filename):
if filename is None:
return False
f = str(filename)
if f == '':
return False
if not os.path.exists(f):
return False
if not os.path.isfile(f):
print(
f'J2cfg: not a file, skipping: {filename}',
file=sys.stderr)
return False
if f.endswith('.yml') or f.endswith('.yaml'):
with open(f, mode='r', encoding='utf-8') as fx:
for x in yaml.safe_load_all(fx):
if not x:
continue
self.kwargs['j2cfg'] = merge_dict_recurse(
self.kwargs['j2cfg'], x
)
return True
if f.endswith('.json'):
with open(f, mode='r', encoding='utf-8') as fx:
self.kwargs['j2cfg'] = merge_dict_recurse(
self.kwargs['j2cfg'], json.load(fx)
)
return True
print(
f'J2cfg: non-recognized name extension: {filename}',
file=sys.stderr)
return False
def merge_dict_default():
search_pattern = '|'.join(['*.' + ext for ext in J2CFG_CONFIG_EXT])
search_flags = wcmatch.wcmatch.SYMLINKS
for d in self.config_path:
if not os.path.isdir(d):
continue
m = wcmatch.wcmatch.WcMatch(d, search_pattern,
flags=search_flags)
for f in sorted(m.match()):
if self.dump_only:
real_f = os.path.realpath(f)
if f == real_f:
print(
f'J2cfg: try loading {f}',
file=sys.stderr
)
else:
print(
f'J2cfg: try loading {f} <- {real_f}',
file=sys.stderr
)
merge_dict_from_file(f)
if self.config_file is None:
merge_dict_default()
else:
if os.path.isfile(self.config_file):
merge_dict_from_file(self.config_file)
else:
print(
'J2cfg: J2cfg config file does not exist, skipping: '
+ f'{self.config_file}',
file=sys.stderr
)
if self.dump_only:
return
self.strict = strict
if not isinstance(self.strict, bool):
self.strict = True
self.search_path = search_path
if self.search_path is None:
self.search_path = os.getenv('J2CFG_SEARCH_PATH')
if self.search_path is not None:
self.search_path = str_split_to_list(self.search_path, ':')
if self.search_path is None:
self.search_path = self.config_path.copy()
else:
self.search_path = any_to_str_list(self.search_path)
self.search_path = uniq_str_list(self.search_path)
# RFC: should we use the current working directory early?
for d in [os.getcwd()]:
if d not in self.search_path:
self.search_path.insert(0, d)
self.modules = modules or os.getenv('J2CFG_MODULES')
if self.modules is None:
self.modules = J2CFG_PYTHON_MODULES.copy()
else:
if isinstance(self.modules, str):
self.modules = str_split_to_list(self.modules)
else:
self.modules = any_to_str_list(self.modules)
self.modules = uniq_str_list(self.modules)
self.template_suffix = template_suffix or os.getenv('J2CFG_SUFFIX')
if self.template_suffix is None:
self.template_suffix = '' + J2CFG_TEMPLATE_EXT
else:
self.template_suffix = str(self.template_suffix)
if self.template_suffix == '':
self.template_suffix = '' + J2CFG_TEMPLATE_EXT
if not self.template_suffix.startswith('.'):
self.template_suffix = '.' + self.template_suffix
self.kwargs.update({
'env': os.environ,
'env_preserve': J2CFG_PRESERVE_ENVS.copy(),
'env_passthrough': J2CFG_PASSTHROUGH_ENVS.copy(),
})
for m in self.modules:
if m in self.kwargs:
print(f'J2cfg: kwargs already has {m} key',
file=sys.stderr)
continue
self.kwargs[m] = importlib.import_module(m)
self.j2fs_loaders = {
d: jinja2.FileSystemLoader(
d, encoding='utf-8', followlinks=True,
) for d in self.search_path
}
self.j2env = jinja2.Environment(
extensions=J2CFG_JINJA_EXTENSIONS,
loader=jinja2.ChoiceLoader([
self.j2fs_loaders[d] for d in self.search_path
]),
)
def init_env(e: jinja2.Environment):
for s in J2CFG_FILTERS:
n = s.__name__
if n in e.filters:
print(f'J2cfg: filters already has {n} key',
file=sys.stderr)
continue
e.filters[n] = s
init_env(self.j2env)
def dump_config(self):
return yaml.safe_dump(self.kwargs['j2cfg'])
def ensure_fs_loader_for(self, directory: str):
if self.dump_only:
raise ValueError('dump_only is True')
if directory in self.j2fs_loaders:
return
self.j2fs_loaders[directory] = jinja2.FileSystemLoader(
directory, encoding='utf-8', followlinks=True,
)
def render_file(self, file_in, file_out=None) -> bool:
if self.dump_only:
raise ValueError('dump_only is True')
def render_error(msg) -> bool:
if self.strict:
raise ValueError(msg)
print(f'J2cfg: {msg}', file=sys.stderr)
return False
if file_in is None:
return render_error(
'argument "file_in" is None')
f_in = str(file_in)
if f_in == '':
return render_error(
'argument "file_in" is empty')
if not os.path.exists(f_in):
return render_error(
f'file is missing: {file_in}')
if not os.path.isfile(f_in):
return render_error(
f'not a file: {file_in}')
f_out = file_out
if f_out is None:
if not f_in.endswith(self.template_suffix):
return render_error(
f'input file name extension mismatch: {file_in}')
f_out = os.path.splitext(f_in)[0]
dirs = self.search_path.copy()
for d in [os.getcwd(), os.path.dirname(f_in)]:
if d in dirs:
continue
self.ensure_fs_loader_for(d)
dirs.insert(0, d)
if f_in.startswith('/'):
self.ensure_fs_loader_for('/')
dirs.append('/')
j2_environ = self.j2env.overlay(loader=jinja2.ChoiceLoader([
self.j2fs_loaders[d] for d in dirs
]))
j2_template = j2_environ.get_template(f_in)
rendered = j2_template.render(**self.kwargs)
if os.path.lexists(f_out):
if os.path.islink(f_out) or (not os.path.isfile(f_out)):
return render_error(
f'output file is not safely writable: {f_out}')
if os.path.exists(f_out):
if os.path.samefile(f_in, f_out):
return render_error(
f'unable to process template inplace: {file_in}')
with open(f_out, mode='w', encoding='utf-8') as f:
f.write(rendered)
return True