treewide: improve template expansion
also: support *.toml as extra configuration dictionaries
This commit is contained in:
@@ -9,7 +9,7 @@ def main():
|
||||
import j2cfg
|
||||
|
||||
j = j2cfg.J2cfg(dump_only=True)
|
||||
print(j.dump_config())
|
||||
print(j.dump_config_yml())
|
||||
|
||||
sys.exit(0)
|
||||
|
@@ -1,8 +1,10 @@
|
||||
import importlib
|
||||
import json
|
||||
import os
|
||||
import os.path
|
||||
import sys
|
||||
import datetime
|
||||
import importlib
|
||||
import json
|
||||
import tomllib
|
||||
|
||||
import jinja2
|
||||
import wcmatch.wcmatch
|
||||
@@ -12,13 +14,29 @@ from .settings import *
|
||||
from .functions import *
|
||||
|
||||
|
||||
J2CFG_CONFIG_EXT = ['yml', 'yaml', 'json']
|
||||
|
||||
|
||||
class J2cfg:
|
||||
def __init__(self, strict=True, config_file=None, config_path=None,
|
||||
modules=None, search_path=None, template_suffix=None,
|
||||
dump_only=False):
|
||||
def ensure_fs_loader_for(self, directory: str, from_init = False) -> bool:
|
||||
if self.dump_only:
|
||||
raise ValueError('dump_only is True')
|
||||
|
||||
if self.j2fs_loaders is None:
|
||||
raise ValueError('j2fs_loaders is None')
|
||||
if directory in self.j2fs_loaders:
|
||||
return not bool(from_init)
|
||||
if directory == '':
|
||||
print('J2cfg: ensure_fs_loader_for(): empty directory name, skipping', file=sys.stderr)
|
||||
return False
|
||||
if not os.path.isdir(directory):
|
||||
print(f'J2cfg: ensure_fs_loader_for(): not a directory or does not exist, skipping: {directory}', file=sys.stderr)
|
||||
return False
|
||||
self.j2fs_loaders[directory] = jinja2.FileSystemLoader(
|
||||
directory, encoding='utf-8', followlinks=True,
|
||||
)
|
||||
return True
|
||||
|
||||
def __init__(self, dump_only=False, strict=True, unlink_source=None,
|
||||
config_file=None, config_path=None, search_path=None,
|
||||
):
|
||||
|
||||
if dump_only is None:
|
||||
self.dump_only = False
|
||||
@@ -42,64 +60,63 @@ class J2cfg:
|
||||
|
||||
self.kwargs = {'j2cfg': {}}
|
||||
|
||||
def merge_dict_from_file(filename):
|
||||
def merge_dict_from_file(filename) -> bool:
|
||||
if filename is None:
|
||||
return False
|
||||
f = str(filename)
|
||||
if f == '':
|
||||
if f == "":
|
||||
return False
|
||||
if not os.path.exists(f):
|
||||
print(f'J2cfg: merge_dict_from_file(): path does not exist, skipping: {filename}', file=sys.stderr)
|
||||
return False
|
||||
if not os.path.isfile(f):
|
||||
print(
|
||||
f'J2cfg: not a file, skipping: {filename}',
|
||||
file=sys.stderr)
|
||||
print(f'J2cfg: merge_dict_from_file(): not a file, skipping: {filename}', file=sys.stderr)
|
||||
return False
|
||||
|
||||
if f.endswith('.yml') or f.endswith('.yaml'):
|
||||
ext = os.path.splitext(f)[1]
|
||||
if ext not in J2CFG_CONFIG_EXT:
|
||||
print(f'J2cfg: merge_dict_from_file(): non-recognized name extension: {f}', file=sys.stderr)
|
||||
return False
|
||||
|
||||
if ext in [ '.yml', '.yaml' ]:
|
||||
with open(f, mode='r', encoding='utf-8') as fx:
|
||||
for x in yaml.safe_load_all(fx):
|
||||
if not x:
|
||||
# print(f'J2cfg: received empty document from: {f}', file=sys.stderr)
|
||||
continue
|
||||
self.kwargs['j2cfg'] = merge_dict_recurse(
|
||||
self.kwargs['j2cfg'], x
|
||||
)
|
||||
self.kwargs['j2cfg'] = merge_dict_recurse(self.kwargs['j2cfg'], x)
|
||||
return True
|
||||
|
||||
if f.endswith('.json'):
|
||||
if ext == '.toml':
|
||||
with open(f, mode='rb') as fx:
|
||||
x = tomllib.load(fx)
|
||||
self.kwargs['j2cfg'] = merge_dict_recurse(self.kwargs['j2cfg'], x)
|
||||
return True
|
||||
|
||||
if ext == '.json':
|
||||
with open(f, mode='r', encoding='utf-8') as fx:
|
||||
self.kwargs['j2cfg'] = merge_dict_recurse(
|
||||
self.kwargs['j2cfg'], json.load(fx)
|
||||
)
|
||||
x = json.load(fx)
|
||||
self.kwargs['j2cfg'] = merge_dict_recurse(self.kwargs['j2cfg'], x)
|
||||
return True
|
||||
|
||||
print(
|
||||
f'J2cfg: non-recognized name extension: {filename}',
|
||||
file=sys.stderr)
|
||||
return False
|
||||
|
||||
def merge_dict_default():
|
||||
search_pattern = '|'.join(['*.' + ext for ext in J2CFG_CONFIG_EXT])
|
||||
search_pattern = '|'.join( [ '*' + ext for ext in J2CFG_CONFIG_EXT ] )
|
||||
search_flags = wcmatch.wcmatch.SYMLINKS
|
||||
|
||||
for d in self.config_path:
|
||||
if not os.path.isdir(d):
|
||||
print(f'J2cfg: merge_dict_default(): not a directory or does not exist, skipping: {d}', file=sys.stderr)
|
||||
continue
|
||||
m = wcmatch.wcmatch.WcMatch(d, search_pattern,
|
||||
flags=search_flags)
|
||||
m = wcmatch.wcmatch.WcMatch(d, search_pattern, flags=search_flags)
|
||||
for f in sorted(m.match()):
|
||||
if self.dump_only:
|
||||
real_f = os.path.realpath(f)
|
||||
if f == real_f:
|
||||
print(
|
||||
f'J2cfg: try loading {f}',
|
||||
file=sys.stderr
|
||||
)
|
||||
print(f'J2cfg: try loading {f}', file=sys.stderr)
|
||||
else:
|
||||
print(
|
||||
f'J2cfg: try loading {f} <- {real_f}',
|
||||
file=sys.stderr
|
||||
)
|
||||
print(f'J2cfg: try loading {f} <- {real_f}', file=sys.stderr)
|
||||
merge_dict_from_file(f)
|
||||
|
||||
if self.config_file is None:
|
||||
@@ -109,11 +126,7 @@ class J2cfg:
|
||||
if os.path.isfile(self.config_file):
|
||||
merge_dict_from_file(self.config_file)
|
||||
else:
|
||||
print(
|
||||
'J2cfg: J2cfg config file does not exist, skipping: '
|
||||
+ f'{self.config_file}',
|
||||
file=sys.stderr
|
||||
)
|
||||
print('J2cfg: config file does not exist, skipping: {self.config_file}', file=sys.stderr)
|
||||
|
||||
if self.dump_only:
|
||||
return
|
||||
@@ -122,6 +135,15 @@ class J2cfg:
|
||||
if not isinstance(self.strict, bool):
|
||||
self.strict = True
|
||||
|
||||
if unlink_source is not None:
|
||||
self.unlink_source = from_gobool(unlink_source)
|
||||
else:
|
||||
x = os.getenv('J2CFG_UNLINK_SRC')
|
||||
if x is None:
|
||||
self.unlink_source = False
|
||||
else:
|
||||
self.unlink_source = from_gobool(x)
|
||||
|
||||
self.search_path = search_path
|
||||
if self.search_path is None:
|
||||
self.search_path = os.getenv('J2CFG_SEARCH_PATH')
|
||||
@@ -132,85 +154,55 @@ class J2cfg:
|
||||
else:
|
||||
self.search_path = uniq_str_list(any_to_str_list(self.search_path))
|
||||
# RFC: should we use the current working directory early?
|
||||
for d in [os.getcwd()]:
|
||||
for d in [ os.getcwd() ]:
|
||||
if d not in self.search_path:
|
||||
self.search_path.insert(0, d)
|
||||
|
||||
self.modules = modules or os.getenv('J2CFG_MODULES')
|
||||
if self.modules is None:
|
||||
self.modules = J2CFG_PYTHON_MODULES
|
||||
else:
|
||||
if isinstance(self.modules, str):
|
||||
self.modules = str_split_to_list(self.modules)
|
||||
else:
|
||||
self.modules = any_to_str_list(self.modules)
|
||||
self.modules = uniq_str_list(self.modules)
|
||||
|
||||
self.template_suffix = template_suffix or os.getenv('J2CFG_SUFFIX')
|
||||
if self.template_suffix is None:
|
||||
self.template_suffix = J2CFG_TEMPLATE_EXT
|
||||
else:
|
||||
self.template_suffix = str(self.template_suffix)
|
||||
if self.template_suffix == '':
|
||||
self.template_suffix = J2CFG_TEMPLATE_EXT
|
||||
if not self.template_suffix.startswith('.'):
|
||||
self.template_suffix = '.' + self.template_suffix
|
||||
|
||||
self.kwargs.update({
|
||||
'env': os.environ,
|
||||
'env_vars_preserve': J2CFG_PRESERVE_ENVS,
|
||||
'env_vars_passthrough': J2CFG_PASSTHROUGH_ENVS,
|
||||
})
|
||||
|
||||
self.j2fs_loaders = {
|
||||
d: jinja2.FileSystemLoader(
|
||||
d, encoding='utf-8', followlinks=True,
|
||||
) for d in self.search_path
|
||||
}
|
||||
self.j2fs_loaders = {}
|
||||
t = []
|
||||
for d in self.search_path:
|
||||
d = os.path.abspath(d)
|
||||
if self.ensure_fs_loader_for(d, from_init=True):
|
||||
t.append(d)
|
||||
continue
|
||||
self.search_path = t
|
||||
self.j2env = jinja2.Environment(
|
||||
extensions=J2CFG_JINJA_EXTENSIONS,
|
||||
loader=jinja2.ChoiceLoader([
|
||||
self.j2fs_loaders[d] for d in self.search_path
|
||||
]),
|
||||
loader=jinja2.ChoiceLoader(
|
||||
[ self.j2fs_loaders[d] for d in self.search_path ]
|
||||
),
|
||||
)
|
||||
|
||||
def init_env(e: jinja2.Environment):
|
||||
for m in self.modules:
|
||||
for m in J2CFG_PYTHON_MODULES:
|
||||
if m in e.globals:
|
||||
print(f'J2cfg: globals already has {m} key, module will not be imported',
|
||||
file=sys.stderr)
|
||||
print(f'J2cfg: globals already has {m} key, module will not be imported', file=sys.stderr)
|
||||
continue
|
||||
e.globals.update({m: importlib.import_module(m)})
|
||||
e.globals.update( { m: importlib.import_module(m) } )
|
||||
for s in J2CFG_FUNCTIONS:
|
||||
n = s.__name__
|
||||
if n in e.globals:
|
||||
print(f'J2cfg: globals already has {n} key, function will not be imported',
|
||||
file=sys.stderr)
|
||||
print(f'J2cfg: globals already has {n} key, function will not be imported', file=sys.stderr)
|
||||
continue
|
||||
e.globals.update({n: s})
|
||||
e.globals.update( { n: s } )
|
||||
for s in J2CFG_FILTERS:
|
||||
n = s.__name__
|
||||
if n in e.filters:
|
||||
print(f'J2cfg: filters already has {n} key, filter will not be imported',
|
||||
file=sys.stderr)
|
||||
print(f'J2cfg: filters already has {n} key, filter will not be imported', file=sys.stderr)
|
||||
continue
|
||||
e.filters.update({n: s})
|
||||
e.filters.update( { n: s } )
|
||||
|
||||
init_env(self.j2env)
|
||||
|
||||
def dump_config(self):
|
||||
def dump_config_yml(self):
|
||||
return yaml.safe_dump(self.kwargs['j2cfg'])
|
||||
|
||||
def ensure_fs_loader_for(self, directory: str):
|
||||
if self.dump_only:
|
||||
raise ValueError('dump_only is True')
|
||||
|
||||
if directory in self.j2fs_loaders:
|
||||
return
|
||||
self.j2fs_loaders[directory] = jinja2.FileSystemLoader(
|
||||
directory, encoding='utf-8', followlinks=True,
|
||||
)
|
||||
|
||||
def render_file(self, file_in, file_out=None) -> bool:
|
||||
if self.dump_only:
|
||||
raise ValueError('dump_only is True')
|
||||
@@ -218,56 +210,93 @@ class J2cfg:
|
||||
def render_error(msg) -> bool:
|
||||
if self.strict:
|
||||
raise ValueError(msg)
|
||||
print(f'J2cfg: {msg}', file=sys.stderr)
|
||||
print(f'J2cfg: render_file(): {msg}', file=sys.stderr)
|
||||
return False
|
||||
|
||||
_STDIN = '/dev/stdin'
|
||||
_STDOUT = '/dev/stdout'
|
||||
|
||||
if file_in is None:
|
||||
return render_error(
|
||||
'argument "file_in" is None')
|
||||
return render_error('argument "file_in" is None')
|
||||
f_in = str(file_in)
|
||||
if f_in == '':
|
||||
return render_error(
|
||||
'argument "file_in" is empty')
|
||||
if not os.path.exists(f_in):
|
||||
return render_error(
|
||||
f'file is missing: {file_in}')
|
||||
if not os.path.isfile(f_in):
|
||||
return render_error(
|
||||
f'not a file: {file_in}')
|
||||
return render_error('argument "file_in" is empty')
|
||||
if f_in == '-':
|
||||
f_in = _STDIN
|
||||
if f_in == _STDIN:
|
||||
f_stdin = True
|
||||
else:
|
||||
f_stdin = os.path.samefile(f_in, _STDIN)
|
||||
if not f_stdin:
|
||||
if not os.path.exists(f_in):
|
||||
return render_error(f'file is missing: {file_in}')
|
||||
if not os.path.isfile(f_in):
|
||||
return render_error(f'not a file: {file_in}')
|
||||
|
||||
f_out = file_out
|
||||
if f_out is None:
|
||||
if not f_in.endswith(self.template_suffix):
|
||||
return render_error(
|
||||
f'input file name extension mismatch: {file_in}')
|
||||
f_out = os.path.splitext(f_in)[0]
|
||||
if f_stdin:
|
||||
f_out = _STDOUT
|
||||
else:
|
||||
if not f_in.endswith(J2CFG_TEMPLATE_EXT):
|
||||
return render_error(f'input file name extension mismatch: {file_in}')
|
||||
f_out = os.path.splitext(f_in)[0]
|
||||
if f_out == '-':
|
||||
f_out = _STDOUT
|
||||
if f_out == _STDOUT:
|
||||
f_stdout = True
|
||||
else:
|
||||
f_stdout = os.path.exists(f_out) and os.path.samefile(f_out, _STDOUT)
|
||||
|
||||
dirs = self.search_path.copy()
|
||||
for d in [os.getcwd(), os.path.dirname(f_in)]:
|
||||
if d in dirs:
|
||||
continue
|
||||
self.ensure_fs_loader_for(d)
|
||||
dirs.insert(0, d)
|
||||
if f_in.startswith('/'):
|
||||
self.ensure_fs_loader_for('/')
|
||||
dirs.append('/')
|
||||
extra_dirs = [ os.getcwd() ]
|
||||
if not f_stdin:
|
||||
f_in_dir = os.path.dirname(f_in)
|
||||
if f_in_dir == '':
|
||||
f_in_dir = '.'
|
||||
extra_dirs.append(f_in_dir)
|
||||
for d in extra_dirs:
|
||||
d = os.path.abspath(d)
|
||||
if self.ensure_fs_loader_for(d):
|
||||
dirs.insert(0, d)
|
||||
|
||||
j2_environ = self.j2env.overlay(loader=jinja2.ChoiceLoader([
|
||||
self.j2fs_loaders[d] for d in dirs
|
||||
]))
|
||||
j2_template = j2_environ.get_template(f_in)
|
||||
if not f_stdin:
|
||||
if f_in.startswith('/'):
|
||||
for d in [ '/' ]:
|
||||
if d in dirs:
|
||||
continue
|
||||
if self.ensure_fs_loader_for(d):
|
||||
dirs.append('/')
|
||||
|
||||
j2_environ = self.j2env.overlay(
|
||||
loader=jinja2.ChoiceLoader(
|
||||
[ self.j2fs_loaders[d] for d in dirs ]
|
||||
)
|
||||
)
|
||||
if f_stdin:
|
||||
j2_template = j2_environ.from_string(''.join(sys.stdin.readlines()))
|
||||
else:
|
||||
j2_template = j2_environ.get_template(f_in)
|
||||
rendered = j2_template.render(**self.kwargs)
|
||||
|
||||
if os.path.lexists(f_out):
|
||||
if os.path.islink(f_out) or (not os.path.isfile(f_out)):
|
||||
return render_error(
|
||||
f'output file is not safely writable: {f_out}')
|
||||
if os.path.exists(f_out):
|
||||
if os.path.samefile(f_in, f_out):
|
||||
return render_error(
|
||||
f'unable to process template inplace: {file_in}')
|
||||
if not (f_stdin and f_stdout):
|
||||
if os.path.exists(f_out) and os.path.samefile(f_in, f_out):
|
||||
return render_error(f'unable to process template inplace: {file_in}')
|
||||
if not f_stdout:
|
||||
if os.path.lexists(f_out) and (os.path.islink(f_out) or (not os.path.isfile(f_out))):
|
||||
return render_error(f'output file is not safely writable: {f_out}')
|
||||
|
||||
with open(f_out, mode='w', encoding='utf-8') as f:
|
||||
f.write(rendered)
|
||||
if f_stdout:
|
||||
sys.stdout.write(rendered)
|
||||
sys.stdout.flush()
|
||||
else:
|
||||
with open(f_out, mode='w', encoding='utf-8') as f:
|
||||
f.write(rendered)
|
||||
f.flush()
|
||||
|
||||
if self.unlink_source:
|
||||
if f_stdin:
|
||||
return render_error('cannot unlink stdin')
|
||||
os.unlink(f_in)
|
||||
|
||||
return True
|
||||
|
@@ -1,48 +1,58 @@
|
||||
import os
|
||||
import sys
|
||||
import pathlib
|
||||
import os.path
|
||||
import collections.abc
|
||||
import itertools
|
||||
import os.path
|
||||
import pathlib
|
||||
import hashlib
|
||||
import re
|
||||
import sys
|
||||
|
||||
import jinja2
|
||||
|
||||
from .settings import is_env_banned
|
||||
from .settings import J2CFG_BANNED_ENVS
|
||||
|
||||
|
||||
def is_sequence(x) -> bool:
|
||||
if isinstance(x, str):
|
||||
return False
|
||||
return isinstance(x, collections.abc.Sequence)
|
||||
J2CFG_BUILTIN_FUNCTIONS = [
|
||||
repr,
|
||||
sorted,
|
||||
]
|
||||
|
||||
|
||||
def is_mapping(x) -> bool:
|
||||
def is_str(x) -> bool:
|
||||
return isinstance(x, str)
|
||||
|
||||
|
||||
def is_seq(x) -> bool:
|
||||
return isinstance(x, collections.abc.Sequence) and not is_str(x)
|
||||
|
||||
|
||||
def is_map(x) -> bool:
|
||||
return isinstance(x, collections.abc.Mapping)
|
||||
|
||||
|
||||
def uniq(a: list | set) -> list:
|
||||
return sorted(set(a))
|
||||
return list(set(a))
|
||||
|
||||
|
||||
def remove_non_str(a: list | set) -> list:
|
||||
return list(filter(lambda x: isinstance(x, str), a))
|
||||
def only_str(a: list | set) -> list:
|
||||
return list(filter(is_str, a))
|
||||
|
||||
|
||||
def remove_empty_str(a: list | set) -> list:
|
||||
def non_empty_str(a: list | set) -> list:
|
||||
return list(filter(None, a))
|
||||
|
||||
|
||||
def uniq_str_list(a: list | set) -> list:
|
||||
return remove_empty_str(uniq(a))
|
||||
return uniq(non_empty_str(a))
|
||||
|
||||
|
||||
def str_split_to_list(s: str, sep=r'\s+') -> list:
|
||||
return remove_empty_str(re.split(sep, s))
|
||||
return non_empty_str(re.split(sep, s))
|
||||
|
||||
|
||||
def dict_to_env_str_list(x: dict) -> list:
|
||||
def dict_to_str_list(x: dict) -> list:
|
||||
r = []
|
||||
for k in sorted(x.keys()):
|
||||
for k in x.keys():
|
||||
if x[k] is None:
|
||||
r.append(f'{k}')
|
||||
else:
|
||||
@@ -53,152 +63,142 @@ def dict_to_env_str_list(x: dict) -> list:
|
||||
def any_to_str_list(x) -> list:
|
||||
if x is None:
|
||||
return []
|
||||
if isinstance(x, str):
|
||||
if is_str(x):
|
||||
return [x]
|
||||
if is_sequence(x):
|
||||
if is_seq(x):
|
||||
return [str(e) for e in x]
|
||||
if is_mapping(x):
|
||||
return dict_to_env_str_list(x)
|
||||
if is_map(x):
|
||||
return dict_to_str_list(x)
|
||||
return [str(x)]
|
||||
|
||||
|
||||
def is_re_match(x, pattern, flags=0) -> bool:
|
||||
if isinstance(x, str):
|
||||
return bool(re.match(pattern, x, flags))
|
||||
if is_sequence(x):
|
||||
return any(is_re_match(v, pattern, flags) for v in x)
|
||||
if is_mapping(x):
|
||||
return any(is_re_match(v, pattern, flags) for v in x.keys())
|
||||
def is_re_match(x, pat, opt=0) -> bool:
|
||||
if is_str(x):
|
||||
return bool(re.match(pat, x, opt))
|
||||
if is_seq(x):
|
||||
return any(is_re_match(v, pat, opt) for v in x)
|
||||
if is_map(x):
|
||||
return any(is_re_match(v, pat, opt) for v in x.keys())
|
||||
return False
|
||||
|
||||
|
||||
def is_re_fullmatch(x, pattern, flags=0) -> bool:
|
||||
if isinstance(x, str):
|
||||
return bool(re.fullmatch(pattern, x, flags))
|
||||
if is_sequence(x):
|
||||
return any(is_re_fullmatch(v, pattern, flags) for v in x)
|
||||
if is_mapping(x):
|
||||
return any(is_re_fullmatch(v, pattern, flags) for v in x.keys())
|
||||
def is_re_fullmatch(x, pat, opt=0) -> bool:
|
||||
if is_str(x):
|
||||
return bool(re.fullmatch(pat, x, opt))
|
||||
if is_seq(x):
|
||||
return any(is_re_fullmatch(v, pat, opt) for v in x)
|
||||
if is_map(x):
|
||||
return any(is_re_fullmatch(v, pat, opt) for v in x.keys())
|
||||
return False
|
||||
|
||||
|
||||
def re_match(x, pattern, flags=0):
|
||||
if isinstance(x, str):
|
||||
return re.match(pattern, x, flags)
|
||||
if is_sequence(x):
|
||||
return [v for v in x
|
||||
if re_match(v, pattern, flags)]
|
||||
if is_mapping(x):
|
||||
return {k: v for k, v in x.items()
|
||||
if re_match(k, pattern, flags)}
|
||||
def re_match(x, pat, opt=0):
|
||||
if is_str(x):
|
||||
return re.match(pat, x, opt)
|
||||
if is_seq(x):
|
||||
return [v for v in x if re_match(v, pat, opt)]
|
||||
if is_map(x):
|
||||
return {k: v for k, v in x.items() if re_match(k, pat, opt)}
|
||||
return None
|
||||
|
||||
|
||||
def re_fullmatch(x, pattern, flags=0):
|
||||
if isinstance(x, str):
|
||||
return re.fullmatch(pattern, x, flags)
|
||||
if is_sequence(x):
|
||||
return [v for v in x
|
||||
if re_fullmatch(v, pattern, flags)]
|
||||
if is_mapping(x):
|
||||
return {k: v for k, v in x.items()
|
||||
if re_fullmatch(k, pattern, flags)}
|
||||
def re_fullmatch(x, pat, opt=0):
|
||||
if is_str(x):
|
||||
return re.fullmatch(pat, x, opt)
|
||||
if is_seq(x):
|
||||
return [v for v in x if re_fullmatch(v, pat, opt)]
|
||||
if is_map(x):
|
||||
return {k: v for k, v in x.items() if re_fullmatch(k, pat, opt)}
|
||||
return None
|
||||
|
||||
|
||||
def re_match_negate(x, pattern, flags=0):
|
||||
if isinstance(x, str):
|
||||
return not bool(re.match(pattern, x, flags))
|
||||
if is_sequence(x):
|
||||
return [v for v in x
|
||||
if re_match_negate(v, pattern, flags)]
|
||||
if is_mapping(x):
|
||||
return {k: v for k, v in x.items()
|
||||
if re_match_negate(k, pattern, flags)}
|
||||
def re_match_neg(x, pat, opt=0):
|
||||
if is_str(x):
|
||||
return not bool(re.match(pat, x, opt))
|
||||
if is_seq(x):
|
||||
return [v for v in x if re_match_neg(v, pat, opt)]
|
||||
if is_map(x):
|
||||
return {k: v for k, v in x.items() if re_match_neg(k, pat, opt)}
|
||||
return x
|
||||
|
||||
|
||||
def re_fullmatch_negate(x, pattern, flags=0):
|
||||
if isinstance(x, str):
|
||||
return not bool(re.fullmatch(pattern, x, flags))
|
||||
if is_sequence(x):
|
||||
return [v for v in x
|
||||
if re_fullmatch_negate(v, pattern, flags)]
|
||||
if is_mapping(x):
|
||||
return {k: v for k, v in x.items()
|
||||
if re_fullmatch_negate(k, pattern, flags)}
|
||||
def re_fullmatch_neg(x, pat, opt=0):
|
||||
if is_str(x):
|
||||
return not bool(re.fullmatch(pat, x, opt))
|
||||
if is_seq(x):
|
||||
return [v for v in x if re_fullmatch_neg(v, pat, opt)]
|
||||
if is_map(x):
|
||||
return {k: v for k, v in x.items() if re_fullmatch_neg(k, pat, opt)}
|
||||
return x
|
||||
|
||||
|
||||
def dict_remap_keys(x: dict, key_map) -> dict:
|
||||
if key_map is None:
|
||||
print('dict_remap_keys: key_map is None', file=sys.stderr)
|
||||
return x
|
||||
p = set(x.keys())
|
||||
m = {}
|
||||
for k in x:
|
||||
for k in set(x.keys()):
|
||||
v = key_map(k)
|
||||
if v == k:
|
||||
if v in m:
|
||||
# merely debug output
|
||||
print(f'dict_remap_keys: duplicate key {repr(v)} <= {repr(k)}', file=sys.stderr)
|
||||
continue
|
||||
m[k] = v
|
||||
p.discard(k)
|
||||
p.discard(v)
|
||||
return {k: x[k] for k in p} | {v: x[k] for k, v in m.items()}
|
||||
m[v] = x[k]
|
||||
return m
|
||||
|
||||
|
||||
def re_sub(x, pattern, repl, count=0, flags=0):
|
||||
if isinstance(x, str):
|
||||
return re.sub(pattern, repl, x, count, flags)
|
||||
if is_sequence(x):
|
||||
return [
|
||||
re_sub(v, pattern, repl, count, flags)
|
||||
for v in x
|
||||
]
|
||||
if is_mapping(x):
|
||||
return dict_remap_keys(
|
||||
x, lambda k:
|
||||
re_sub(k, pattern, repl, count, flags)
|
||||
)
|
||||
def re_sub(x, pat, repl, count=0, opt=0):
|
||||
if is_str(x):
|
||||
return re.sub(pat, repl, x, count, opt)
|
||||
if is_seq(x):
|
||||
return [re_sub(v, pat, repl, count, opt) for v in x]
|
||||
if is_map(x):
|
||||
return dict_remap_keys(x, lambda k: re_sub(k, pat, repl, count, opt))
|
||||
return x
|
||||
|
||||
|
||||
def as_cgi_hdr(x):
|
||||
if isinstance(x, str):
|
||||
return 'HTTP_' + re.sub('[^A-Z0-9]+', '_', x.upper()).strip('_')
|
||||
if is_sequence(x):
|
||||
return uniq([
|
||||
as_cgi_hdr(v)
|
||||
for v in x
|
||||
])
|
||||
if is_mapping(x):
|
||||
return dict_remap_keys(
|
||||
x, as_cgi_hdr
|
||||
)
|
||||
def cgi_header(x):
|
||||
if is_str(x):
|
||||
s = re.sub('[^A-Z0-9]+', '_', x.upper()).strip('_')
|
||||
if s == '':
|
||||
# merely debug output
|
||||
print(f'cgi_header: x={repr(x)}', file=sys.stderr)
|
||||
raise ValueError('cgi_header: empty header name')
|
||||
return 'HTTP_' + s
|
||||
if is_seq(x):
|
||||
return [cgi_header(v) for v in x]
|
||||
if is_map(x):
|
||||
return dict_remap_keys(x, cgi_header)
|
||||
return x
|
||||
|
||||
|
||||
def as_ngx_var(x, pfx='custom'):
|
||||
if isinstance(x, str):
|
||||
parts = remove_empty_str(
|
||||
[re.sub('[^a-z0-9]+', '_', str(i).lower()).strip('_')
|
||||
for i in (pfx, x)]
|
||||
)
|
||||
def http_header(x):
|
||||
if is_str(x):
|
||||
s = re.sub('[^a-zA-Z0-9]+', '-', x).strip('-')
|
||||
if s == '':
|
||||
# merely debug output
|
||||
print(f'http_header: x={repr(x)}', file=sys.stderr)
|
||||
raise ValueError('http_header: empty header name')
|
||||
return s
|
||||
if is_seq(x):
|
||||
return [http_header(v) for v in x]
|
||||
if is_map(x):
|
||||
return dict_remap_keys(x, http_header)
|
||||
return x
|
||||
|
||||
|
||||
def ngx_var(x, pfx='custom'):
|
||||
if is_str(x):
|
||||
parts = non_empty_str([re.sub('[^a-z0-9]+', '_', str(i).lower()).strip('_') for i in (pfx, x)])
|
||||
if len(parts) < 2:
|
||||
print(
|
||||
f'as_ngx_var: parts={parts}',
|
||||
file=sys.stderr
|
||||
)
|
||||
raise ValueError('as_ngx_var: incomplete string array')
|
||||
# merely debug output
|
||||
print(f'ngx_var: parts={repr(parts)}', file=sys.stderr)
|
||||
raise ValueError('ngx_var: incomplete string array')
|
||||
return '$' + '_'.join(parts)
|
||||
if is_sequence(x):
|
||||
return uniq([
|
||||
as_ngx_var(v, pfx)
|
||||
for v in x
|
||||
])
|
||||
if is_mapping(x):
|
||||
return dict_remap_keys(
|
||||
x, lambda k:
|
||||
as_ngx_var(k, pfx)
|
||||
)
|
||||
if is_seq(x):
|
||||
return [ngx_var(v, pfx) for v in x]
|
||||
if is_map(x):
|
||||
return dict_remap_keys(x, lambda k: ngx_var(k, pfx))
|
||||
return x
|
||||
|
||||
|
||||
@@ -208,6 +208,12 @@ def any_to_env_dict(x) -> dict:
|
||||
|
||||
h = {}
|
||||
|
||||
def is_env_banned(k: str) -> bool:
|
||||
for r in J2CFG_BANNED_ENVS:
|
||||
if re.match(r, k):
|
||||
return True
|
||||
return False
|
||||
|
||||
def feed(k, parse=False, v=None):
|
||||
if v is None:
|
||||
return
|
||||
@@ -225,12 +231,12 @@ def any_to_env_dict(x) -> dict:
|
||||
return
|
||||
h[k] = v if v is None else str(v)
|
||||
|
||||
if isinstance(x, str):
|
||||
if is_str(x):
|
||||
feed(x, True)
|
||||
elif is_sequence(x):
|
||||
elif is_seq(x):
|
||||
for e in x:
|
||||
feed(e, True)
|
||||
elif is_mapping(x):
|
||||
elif is_map(x):
|
||||
for k in x:
|
||||
feed(k, False, x[k])
|
||||
else:
|
||||
@@ -240,7 +246,7 @@ def any_to_env_dict(x) -> dict:
|
||||
|
||||
|
||||
def dict_keys(x: dict) -> list:
|
||||
return sorted([k for k in x.keys()])
|
||||
return sorted(x.keys())
|
||||
|
||||
|
||||
def dict_empty_keys(x: dict) -> list:
|
||||
@@ -259,39 +265,24 @@ def list_intersect(a: list | set, b: list | set) -> list:
|
||||
return list(set(a) & set(b))
|
||||
|
||||
|
||||
@jinja2.pass_environment
|
||||
def sh_like_file_to_list(j2env, file_in: str) -> list:
|
||||
tpl = j2env.get_template(file_in)
|
||||
text = pathlib.Path(tpl.filename).read_text(encoding='utf-8')
|
||||
lines = re.split(r'[\r\n]', text)
|
||||
return list(itertools.filterfalse(
|
||||
lambda x: re.match(r'\s*#', x), lines
|
||||
))
|
||||
|
||||
|
||||
def ngx_esc(x):
|
||||
if isinstance(x, str):
|
||||
if x == "":
|
||||
if x is None:
|
||||
return None
|
||||
if is_str(x):
|
||||
if x == '':
|
||||
return "''"
|
||||
if re.search(r'(?:\s|[;{}()\[\]\\\'"*?])', x):
|
||||
return repr(x)
|
||||
return x
|
||||
if is_sequence(x):
|
||||
return uniq([
|
||||
ngx_esc(v)
|
||||
for v in x
|
||||
])
|
||||
if is_mapping(x):
|
||||
return dict_remap_keys(
|
||||
x, ngx_esc
|
||||
)
|
||||
if x is None:
|
||||
return None
|
||||
if is_seq(x):
|
||||
return [ngx_esc(v) for v in x]
|
||||
if is_map(x):
|
||||
return dict_remap_keys(x, ngx_esc)
|
||||
return ngx_esc(str(x))
|
||||
|
||||
|
||||
def from_gobool(x) -> bool:
|
||||
if isinstance(x, str):
|
||||
if is_str(x):
|
||||
return x.lower() in {'1', 't', 'true'}
|
||||
return bool(x)
|
||||
|
||||
@@ -304,14 +295,14 @@ def merge_dict_recurse(d1, d2: dict) -> dict:
|
||||
common = keys1 & keys2
|
||||
missing = keys2 - common
|
||||
|
||||
map1 = {k for k in common if is_mapping(x.get(k))}
|
||||
seq1 = {k for k in common if is_sequence(x.get(k))}
|
||||
map1 = {k for k in common if is_map(x.get(k))}
|
||||
seq1 = {k for k in common if is_seq(x.get(k))}
|
||||
misc1 = common - seq1 - map1
|
||||
|
||||
merge_safe = missing | misc1
|
||||
x.update({k: d2.get(k) for k in merge_safe})
|
||||
|
||||
map_common = {k for k in map1 if is_mapping(d2.get(k))}
|
||||
map_common = {k for k in map1 if is_map(d2.get(k))}
|
||||
for k in map_common:
|
||||
y = d2.get(k)
|
||||
if not y:
|
||||
@@ -319,7 +310,7 @@ def merge_dict_recurse(d1, d2: dict) -> dict:
|
||||
continue
|
||||
x[k] = merge_dict_recurse(x.get(k), y)
|
||||
|
||||
seq_common = {k for k in seq1 if is_sequence(d2.get(k))}
|
||||
seq_common = {k for k in seq1 if is_seq(d2.get(k))}
|
||||
for k in seq_common:
|
||||
y = d2.get(k)
|
||||
if not y:
|
||||
@@ -331,10 +322,7 @@ def merge_dict_recurse(d1, d2: dict) -> dict:
|
||||
for k in unmerged:
|
||||
t1 = type(x.get(k))
|
||||
t2 = type(d2.get(k))
|
||||
print(
|
||||
f'merge_dict_recurse(): skipping key {k}'
|
||||
+ f' due to type mismatch: {t1} vs. {t2}',
|
||||
file=sys.stderr)
|
||||
print(f'merge_dict_recurse(): skipping key {k} due to type mismatch: {t1} vs. {t2}', file=sys.stderr)
|
||||
|
||||
return x
|
||||
|
||||
@@ -354,37 +342,159 @@ def join_prefix(prefix: str, *paths) -> str:
|
||||
return rv
|
||||
|
||||
|
||||
J2CFG_FUNCTIONS = [
|
||||
def md5(x: str) -> str:
|
||||
return hashlib.md5(x.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def sha1(x: str) -> str:
|
||||
return hashlib.sha1(x.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def sha256(x: str) -> str:
|
||||
return hashlib.sha256(x.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def sha384(x: str) -> str:
|
||||
return hashlib.sha384(x.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def sha512(x: str) -> str:
|
||||
return hashlib.sha512(x.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def sha3_256(x: str) -> str:
|
||||
return hashlib.sha3_256(x.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def sha3_384(x: str) -> str:
|
||||
return hashlib.sha3_384(x.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def sha3_512(x: str) -> str:
|
||||
return hashlib.sha3_512(x.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def file_md5(x: str) -> str:
|
||||
with open(x, 'rb') as f:
|
||||
return hashlib.md5(f.read()).hexdigest()
|
||||
|
||||
|
||||
def file_sha1(x: str) -> str:
|
||||
with open(x, 'rb') as f:
|
||||
return hashlib.sha1(f.read()).hexdigest()
|
||||
|
||||
|
||||
def file_sha256(x: str) -> str:
|
||||
with open(x, 'rb') as f:
|
||||
return hashlib.sha256(f.read()).hexdigest()
|
||||
|
||||
|
||||
def file_sha384(x: str) -> str:
|
||||
with open(x, 'rb') as f:
|
||||
return hashlib.sha384(f.read()).hexdigest()
|
||||
|
||||
|
||||
def file_sha512(x: str) -> str:
|
||||
with open(x, 'rb') as f:
|
||||
return hashlib.sha512(f.read()).hexdigest()
|
||||
|
||||
|
||||
def file_sha3_256(x: str) -> str:
|
||||
with open(x, 'rb') as f:
|
||||
return hashlib.sha3_256(f.read()).hexdigest()
|
||||
|
||||
|
||||
def file_sha3_384(x: str) -> str:
|
||||
with open(x, 'rb') as f:
|
||||
return hashlib.sha3_384(f.read()).hexdigest()
|
||||
|
||||
|
||||
def file_sha3_512(x: str) -> str:
|
||||
with open(x, 'rb') as f:
|
||||
return hashlib.sha3_512(f.read()).hexdigest()
|
||||
|
||||
|
||||
@jinja2.pass_environment
|
||||
def shell_like_file_to_list(j2env, file_in: str) -> list:
|
||||
tpl = j2env.get_template(file_in)
|
||||
text = pathlib.Path(tpl.filename).read_text(encoding='utf-8')
|
||||
lines = re.split(r'[\r\n]', text)
|
||||
return list(itertools.filterfalse(
|
||||
lambda x: re.match(r'\s*#', x), lines
|
||||
))
|
||||
|
||||
|
||||
@jinja2.pass_context
|
||||
def header_rule_policy(j2ctx: jinja2.runtime.Context, header: str) -> str:
|
||||
DEFAULT_POLICY = 'deny'
|
||||
|
||||
header = http_header(header).lower()
|
||||
# "header" is now guaranteed to be non-empty and lowercase
|
||||
|
||||
x = j2ctx.resolve('j2cfg')
|
||||
if (x is jinja2.Undefined) or (x is None):
|
||||
print('header_rule_policy(): j2cfg is undefined in runtime context', file=sys.stderr)
|
||||
return DEFAULT_POLICY
|
||||
for k in ['headers', 'rules', header]:
|
||||
x = x.get(k)
|
||||
if x is None:
|
||||
break
|
||||
|
||||
if is_str(x):
|
||||
return x
|
||||
return DEFAULT_POLICY
|
||||
|
||||
|
||||
J2CFG_FUNCTIONS = J2CFG_BUILTIN_FUNCTIONS + [
|
||||
any_to_env_dict,
|
||||
any_to_str_list,
|
||||
as_cgi_hdr,
|
||||
as_ngx_var,
|
||||
cgi_header,
|
||||
dict_empty_keys,
|
||||
dict_keys,
|
||||
dict_non_empty_keys,
|
||||
dict_remap_keys,
|
||||
dict_to_env_str_list,
|
||||
dict_to_str_list,
|
||||
file_md5,
|
||||
file_sha1,
|
||||
file_sha256,
|
||||
file_sha384,
|
||||
file_sha512,
|
||||
file_sha3_256,
|
||||
file_sha3_384,
|
||||
file_sha3_512,
|
||||
from_gobool,
|
||||
is_mapping,
|
||||
http_header,
|
||||
is_map,
|
||||
is_re_fullmatch,
|
||||
is_re_match,
|
||||
is_sequence,
|
||||
is_seq,
|
||||
is_str,
|
||||
join_prefix,
|
||||
list_diff,
|
||||
list_intersect,
|
||||
md5,
|
||||
ngx_esc,
|
||||
ngx_var,
|
||||
non_empty_str,
|
||||
only_str,
|
||||
re_fullmatch,
|
||||
re_fullmatch_negate,
|
||||
re_fullmatch_neg,
|
||||
re_match,
|
||||
re_match_negate,
|
||||
re_match_neg,
|
||||
re_sub,
|
||||
remove_empty_str,
|
||||
remove_non_str,
|
||||
sha1,
|
||||
sha256,
|
||||
sha384,
|
||||
sha512,
|
||||
sha3_256,
|
||||
sha3_384,
|
||||
sha3_512,
|
||||
str_split_to_list,
|
||||
uniq,
|
||||
uniq_str_list,
|
||||
]
|
||||
|
||||
J2CFG_FILTERS = J2CFG_FUNCTIONS + [
|
||||
sh_like_file_to_list,
|
||||
header_rule_policy,
|
||||
shell_like_file_to_list,
|
||||
]
|
||||
|
@@ -1,16 +1,21 @@
|
||||
import re
|
||||
|
||||
|
||||
J2CFG_TEMPLATE_EXT = '.j2'
|
||||
|
||||
J2CFG_DEFAULTS_FILE = '/run/ngx/conf/j2cfg.yml'
|
||||
|
||||
J2CFG_PATH = [
|
||||
'/run/ngx/conf/j2cfg',
|
||||
]
|
||||
|
||||
J2CFG_CONFIG_EXT = [
|
||||
'.yml', '.yaml',
|
||||
'.json',
|
||||
'.toml',
|
||||
]
|
||||
|
||||
J2CFG_TEMPLATE_EXT = '.j2'
|
||||
|
||||
J2CFG_PYTHON_MODULES = [
|
||||
'datetime',
|
||||
'hashlib',
|
||||
'itertools',
|
||||
'json',
|
||||
'os',
|
||||
'os.path',
|
||||
'pathlib',
|
||||
@@ -69,10 +74,3 @@ J2CFG_BANNED_ENVS = [
|
||||
r'ENVSUBST_',
|
||||
r'J2CFG_',
|
||||
]
|
||||
|
||||
|
||||
def is_env_banned(k: str) -> bool:
|
||||
for r in J2CFG_BANNED_ENVS:
|
||||
if re.match(r, k):
|
||||
return True
|
||||
return False
|
||||
|
@@ -1,173 +1,162 @@
|
||||
j2cfg:
|
||||
{{ j2cfg }}
|
||||
{% set x = '123' %}
|
||||
x = {{ repr(x) }}
|
||||
ngx_esc(x): {{ x | ngx_esc }}
|
||||
{% set x = '1 23' %}
|
||||
x = {{ repr(x) }}
|
||||
ngx_esc(x): {{ x | ngx_esc }}
|
||||
{% set x = '' %}
|
||||
x = {{ repr(x) }}
|
||||
ngx_esc(x): {{ x | ngx_esc }}
|
||||
{% set x = [1,2,3,4] %}
|
||||
x = {{ x }}
|
||||
ngx_esc(x): {{ x | ngx_esc }}
|
||||
{% set x = {1:2,3:4} %}
|
||||
x = {{ x }}
|
||||
ngx_esc(x): {{ x | ngx_esc }}
|
||||
|
||||
{% set x = '123' %}
|
||||
x = {{ repr(x) }}
|
||||
is_str(x): {{ x | is_str }}
|
||||
is_seq(x): {{ x | is_seq }}
|
||||
is_map(x): {{ x | is_map }}
|
||||
any_to_str_list(x): {{ x | any_to_str_list }}
|
||||
|
||||
{% set x = [1,2,3,4] %}
|
||||
x = {{ x }}
|
||||
is_sequence:
|
||||
{{ x | is_sequence }}
|
||||
is_str(x): {{ x | is_str }}
|
||||
is_seq(x): {{ x | is_seq }}
|
||||
is_map(x): {{ x | is_map }}
|
||||
any_to_str_list(x): {{ x | any_to_str_list }}
|
||||
|
||||
{% set x = {1:2,3:4} %}
|
||||
x = {{ x }}
|
||||
is_sequence:
|
||||
{{ x | is_sequence }}
|
||||
|
||||
{% set x = [1,2,3,4] %}
|
||||
x = {{ x }}
|
||||
is_mapping:
|
||||
{{ x | is_mapping }}
|
||||
|
||||
{% set x = {1:2,3:4} %}
|
||||
x = {{ x }}
|
||||
is_mapping:
|
||||
{{ x | is_mapping }}
|
||||
is_str(x): {{ x | is_str }}
|
||||
is_seq(x): {{ x | is_seq }}
|
||||
is_map(x): {{ x | is_map }}
|
||||
any_to_str_list(x): {{ x | any_to_str_list }}
|
||||
|
||||
{% set x = [2,3,1,2] %}
|
||||
x = {{ x }}
|
||||
uniq:
|
||||
{{ x | uniq }}
|
||||
uniq(x): {{ x | uniq }}
|
||||
|
||||
{% set x = ['2',3,'1','2'] %}
|
||||
x = {{ x }}
|
||||
remove_non_str:
|
||||
{{ x | remove_non_str }}
|
||||
only_str(x): {{ x | only_str }}
|
||||
|
||||
{% set x = ['2','','1','2'] %}
|
||||
x = {{ x }}
|
||||
remove_empty_str:
|
||||
{{ x | remove_empty_str }}
|
||||
non_empty_str(x): {{ x | non_empty_str }}
|
||||
|
||||
{% set x = ['2','3','1','2'] %}
|
||||
x = {{ x }}
|
||||
uniq_str_list:
|
||||
{{ x | uniq_str_list }}
|
||||
uniq_str_list(x): {{ x | uniq_str_list }}
|
||||
|
||||
{% set x = '2 3 1 2 ' %}
|
||||
x = {{ x.__repr__() }}
|
||||
str_split_to_list:
|
||||
{{ x | str_split_to_list }}
|
||||
x = {{ repr(x) }}
|
||||
str_split_to_list(x): {{ x | str_split_to_list }}
|
||||
|
||||
{% set x = '2:3::1:2:' %}
|
||||
x = {{ x.__repr__() }}
|
||||
str_split_to_list(':'):
|
||||
{{ x | str_split_to_list(':') }}
|
||||
x = {{ repr(x) }}
|
||||
str_split_to_list(x, ':'): {{ x | str_split_to_list(':') }}
|
||||
|
||||
{% set x = { 'VAR1': 'Etc/UTC', 'VAR2': '', 'VAR3': None, '4VAR4': 'yeah', 'VAR5=not': 'yeah', 'VAR5=real yeah': None, 'VAR6': {'pi': 3.1415926}, 'VAR7': ['pi', 3.1415926] } %}
|
||||
x = {{ x }}
|
||||
dict_to_env_str_list:
|
||||
{{ x | dict_to_env_str_list }}
|
||||
|
||||
{% set x = '1 2 3 4' %}
|
||||
x = {{ x.__repr__() }}
|
||||
any_to_str_list:
|
||||
{{ x | any_to_str_list }}
|
||||
|
||||
{% set x = [1,2,3,4] %}
|
||||
x = {{ x }}
|
||||
any_to_str_list:
|
||||
{{ x | any_to_str_list }}
|
||||
dict_keys(x): {{ x | dict_keys }}
|
||||
dict_empty_keys(x): {{ x | dict_empty_keys }}
|
||||
dict_non_empty_keys(x): {{ x | dict_non_empty_keys }}
|
||||
dict_to_str_list(x): {{ x | dict_to_str_list }}
|
||||
any_to_str_list(x): {{ x | any_to_str_list }}
|
||||
any_to_env_dict(x): {{ x | any_to_env_dict }}
|
||||
|
||||
{% set x = 3.1415926 %}
|
||||
x = {{ x }}
|
||||
any_to_str_list:
|
||||
{{ x | any_to_str_list }}
|
||||
any_to_str_list(x): {{ x | any_to_str_list }}
|
||||
|
||||
{% set x = ['a2','b3','c1','d2'] %}
|
||||
x = {{ x }}
|
||||
is_re_match('[ab]'):
|
||||
{{ x | is_re_match('[ab]') }}
|
||||
is_re_match('[mn]'):
|
||||
{{ x | is_re_match('[mn]') }}
|
||||
|
||||
{% set x = ['a2','b3','c1','d2'] %}
|
||||
x = {{ x }}
|
||||
is_re_fullmatch('[ab]'):
|
||||
{{ x | is_re_fullmatch('[ab]') }}
|
||||
is_re_fullmatch('[ab][12]'):
|
||||
{{ x | is_re_fullmatch('[ab][12]') }}
|
||||
|
||||
{% set x = ['a2','b3','c1','d2'] %}
|
||||
x = {{ x }}
|
||||
re_match('[ab]'):
|
||||
{{ x | re_match('[ab]') }}
|
||||
re_match('[mn]'):
|
||||
{{ x | re_match('[mn]') }}
|
||||
|
||||
{% set x = ['a2','b3','c1','d2'] %}
|
||||
x = {{ x }}
|
||||
re_fullmatch('[ab]'):
|
||||
{{ x | re_fullmatch('[ab]') }}
|
||||
re_fullmatch('[ab][12]'):
|
||||
{{ x | re_fullmatch('[ab][12]') }}
|
||||
|
||||
{% set x = ['a2','b3','c1','d2'] %}
|
||||
x = {{ x }}
|
||||
re_match_negate('[ab]'):
|
||||
{{ x | re_match_negate('[ab]') }}
|
||||
re_match_negate('[mn]'):
|
||||
{{ x | re_match_negate('[mn]') }}
|
||||
|
||||
{% set x = ['a2','b3','c1','d2'] %}
|
||||
x = {{ x }}
|
||||
re_fullmatch_negate('[ab]'):
|
||||
{{ x | re_fullmatch_negate('[ab]') }}
|
||||
re_fullmatch_negate('[ab][12]'):
|
||||
{{ x | re_fullmatch_negate('[ab][12]') }}
|
||||
is_re_match(x, '[ab]'): {{ x | is_re_match('[ab]') }}
|
||||
is_re_match(x, '[mn]'): {{ x | is_re_match('[mn]') }}
|
||||
is_re_fullmatch(x, '[ab]'): {{ x | is_re_fullmatch('[ab]') }}
|
||||
is_re_fullmatch(x, '[ab][12]'): {{ x | is_re_fullmatch('[ab][12]') }}
|
||||
re_match(x, '[ab]'): {{ x | re_match('[ab]') }}
|
||||
re_match(x, '[mn]'): {{ x | re_match('[mn]') }}
|
||||
re_fullmatch(x, '[ab]'): {{ x | re_fullmatch('[ab]') }}
|
||||
re_fullmatch(x, '[ab][12]'): {{ x | re_fullmatch('[ab][12]') }}
|
||||
re_match_neg(x, '[ab]'): {{ x | re_match_neg('[ab]') }}
|
||||
re_match_neg(x, '[mn]'): {{ x | re_match_neg('[mn]') }}
|
||||
re_fullmatch_neg(x, '[ab]'): {{ x | re_fullmatch_neg('[ab]') }}
|
||||
re_fullmatch_neg(x, '[ab][12]'): {{ x | re_fullmatch_neg('[ab][12]') }}
|
||||
|
||||
{% set x = ['a2b','b3b','c1f','d2g'] %}
|
||||
x = {{ x }}
|
||||
re_sub('[ab]', '_'):
|
||||
{{ x | re_sub('[ab]', '_') }}
|
||||
re_sub('[mn]', '_'):
|
||||
{{ x | re_sub('[mn]', '_') }}
|
||||
|
||||
{% set x = 'settings.py' %}
|
||||
x = {{ x.__repr__() }}
|
||||
sh_like_file_to_list:
|
||||
{{ 'settings.py' | sh_like_file_to_list }}
|
||||
re_sub(x, '[ab]', '_'): {{ x | re_sub('[ab]', '_') }}
|
||||
re_sub(x, '[mn]', '_'): {{ x | re_sub('[mn]', '_') }}
|
||||
|
||||
{% set x = 'Accept-Encoding' %}
|
||||
x = {{ x.__repr__() }}
|
||||
as_cgi_hdr:
|
||||
{{ x | as_cgi_hdr }}
|
||||
x = {{ repr(x) }}
|
||||
cgi_header(x): {{ x | cgi_header }}
|
||||
http_header(x): {{ x | http_header }}
|
||||
|
||||
{% set x = '_Permissions-Policy--' %}
|
||||
x = {{ x.__repr__() }}
|
||||
as_cgi_hdr:
|
||||
{{ x | as_cgi_hdr }}
|
||||
x = {{ repr(x) }}
|
||||
cgi_header(x): {{ x | cgi_header }}
|
||||
http_header(x): {{ x | http_header }}
|
||||
|
||||
{% set x = '@@Content__Type@@' %}
|
||||
x = {{ repr(x) }}
|
||||
cgi_header(x): {{ x | cgi_header }}
|
||||
http_header(x): {{ x | http_header }}
|
||||
|
||||
{% set x = 'proxy-type' %}
|
||||
x = {{ repr(x) }}
|
||||
ngx_var(x): {{ x | ngx_var }}
|
||||
ngx_var(x, 'my'): {{ x | ngx_var('my') }}
|
||||
|
||||
{% set x = 'VAR1=Etc/UTC' %}
|
||||
x = {{ x.__repr__() }}
|
||||
any_to_env_dict:
|
||||
{{ x | any_to_env_dict }}
|
||||
x = {{ repr(x) }}
|
||||
any_to_env_dict(x): {{ x | any_to_env_dict }}
|
||||
|
||||
{% set x = ['VAR1=Etc/UTC', 'VAR2=', 'VAR3', '4VAR4=yeah', 'VAR5=yeah', 'VAR5=not-yeah'] %}
|
||||
x = {{ x }}
|
||||
any_to_env_dict:
|
||||
{{ x | any_to_env_dict }}
|
||||
|
||||
{% set x = { 'VAR1': 'Etc/UTC', 'VAR2': '', 'VAR3': None, '4VAR4': 'yeah', 'VAR5=not': 'yeah', 'VAR5=real yeah': None, 'VAR6': {'pi': 3.1415926}, 'VAR7': ['pi', 3.1415926] } %}
|
||||
x = {{ x }}
|
||||
any_to_env_dict:
|
||||
{{ x | any_to_env_dict }}
|
||||
|
||||
{% set x = { 'VAR1': 'Etc/UTC', 'VAR2': '', 'VAR3': None, '4VAR4': 'yeah', 'VAR5=not': 'yeah', 'VAR5=real yeah': None, 'VAR6': {'pi': 3.1415926}, 'VAR7': ['pi', 3.1415926] } %}
|
||||
x = {{ x }}
|
||||
dict_keys:
|
||||
{{ x | dict_keys }}
|
||||
dict_empty_keys:
|
||||
{{ x | dict_empty_keys }}
|
||||
dict_non_empty_keys:
|
||||
{{ x | dict_non_empty_keys }}
|
||||
any_to_env_dict(x): {{ x | any_to_env_dict }}
|
||||
|
||||
{% set x = [1,2,3,4] %}
|
||||
{% set y = [3,4,5,6] %}
|
||||
x = {{ x }}
|
||||
y = {{ y }}
|
||||
list_diff(x, y):
|
||||
{{ x | list_diff(y) }}
|
||||
list_diff(y, x):
|
||||
{{ y | list_diff(x) }}
|
||||
list_intersect(x, y):
|
||||
{{ x | list_intersect(y) }}
|
||||
list_intersect(y, x):
|
||||
{{ y | list_intersect(x) }}
|
||||
list_diff(x, y): {{ x | list_diff(y) }}
|
||||
list_diff(y, x): {{ y | list_diff(x) }}
|
||||
list_intersect(x, y): {{ x | list_intersect(y) }}
|
||||
list_intersect(y, x): {{ y | list_intersect(x) }}
|
||||
|
||||
{% set x = 'settings.py' %}
|
||||
x = {{ repr(x) }}
|
||||
md5(x): {{ x | md5 }}
|
||||
sha1(x): {{ x | sha1 }}
|
||||
sha256(x): {{ x | sha256 }}
|
||||
sha384(x): {{ x | sha384 }}
|
||||
sha512(x): {{ x | sha512 }}
|
||||
sha3_256(x): {{ x | sha3_256 }}
|
||||
sha3_384(x): {{ x | sha3_384 }}
|
||||
sha3_512(x): {{ x | sha3_512 }}
|
||||
file_md5(x): {{ x | file_md5 }}
|
||||
file_sha1(x): {{ x | file_sha1 }}
|
||||
file_sha256(x): {{ x | file_sha256 }}
|
||||
file_sha384(x): {{ x | file_sha384 }}
|
||||
file_sha512(x): {{ x | file_sha512 }}
|
||||
file_sha3_256(x): {{ x | file_sha3_256 }}
|
||||
file_sha3_384(x): {{ x | file_sha3_384 }}
|
||||
file_sha3_512(x): {{ x | file_sha3_512 }}
|
||||
|
||||
{% set x = '/topdir' %}
|
||||
x = {{ repr(x) }}
|
||||
join_prefix(x, 'access.log'): {{ join_prefix(x, 'access.log') }}
|
||||
join_prefix(x, './access.log'): {{ join_prefix(x, './access.log') }}
|
||||
join_prefix(x, '/access.log'): {{ join_prefix(x, '/access.log') }}
|
||||
join_prefix(x, '../access.log'): {{ join_prefix(x, '../access.log') }}
|
||||
|
||||
{% set x = 'settings.py' %}
|
||||
x = {{ repr(x) }}
|
||||
shell_like_file_to_list(x): {{ x | shell_like_file_to_list }}
|
||||
|
||||
{# end of tests #}
|
||||
|
Reference in New Issue
Block a user