388 lines
9.1 KiB
Python
388 lines
9.1 KiB
Python
import collections.abc
|
|
import itertools
|
|
import os.path
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
|
|
import jinja2
|
|
|
|
from .settings import is_env_banned
|
|
|
|
|
|
def is_sequence(x) -> bool:
|
|
if isinstance(x, str):
|
|
return False
|
|
return isinstance(x, collections.abc.Sequence)
|
|
|
|
|
|
def is_mapping(x) -> bool:
|
|
return isinstance(x, collections.abc.Mapping)
|
|
|
|
|
|
def uniq(a: list | set) -> list:
|
|
return sorted(set(a))
|
|
|
|
|
|
def remove_non_str(a: list | set) -> list:
|
|
return list(filter(lambda x: isinstance(x, str), a))
|
|
|
|
|
|
def remove_empty_str(a: list | set) -> list:
|
|
return list(filter(None, a))
|
|
|
|
|
|
def uniq_str_list(a: list | set) -> list:
|
|
return remove_empty_str(uniq(a))
|
|
|
|
|
|
def str_split_to_list(s: str, sep=r'\s+') -> list:
|
|
return remove_empty_str(re.split(sep, s))
|
|
|
|
|
|
def dict_to_env_str_list(x: dict) -> list:
|
|
r = []
|
|
for k in sorted(x.keys()):
|
|
if x[k] is None:
|
|
r.append(f'{k}')
|
|
else:
|
|
r.append(f'{k}={str(x[k])}')
|
|
return r
|
|
|
|
|
|
def any_to_str_list(x) -> list:
|
|
if x is None:
|
|
return []
|
|
if isinstance(x, str):
|
|
return [x]
|
|
if is_sequence(x):
|
|
return [str(e) for e in x]
|
|
if is_mapping(x):
|
|
return dict_to_env_str_list(x)
|
|
return [str(x)]
|
|
|
|
|
|
def is_re_match(x, pattern, flags=0) -> bool:
|
|
if isinstance(x, str):
|
|
return bool(re.match(pattern, x, flags))
|
|
if is_sequence(x):
|
|
return any(is_re_match(v, pattern, flags) for v in x)
|
|
if is_mapping(x):
|
|
return any(is_re_match(v, pattern, flags) for v in x.keys())
|
|
return False
|
|
|
|
|
|
def is_re_fullmatch(x, pattern, flags=0) -> bool:
|
|
if isinstance(x, str):
|
|
return bool(re.fullmatch(pattern, x, flags))
|
|
if is_sequence(x):
|
|
return any(is_re_fullmatch(v, pattern, flags) for v in x)
|
|
if is_mapping(x):
|
|
return any(is_re_fullmatch(v, pattern, flags) for v in x.keys())
|
|
return False
|
|
|
|
|
|
def re_match(x, pattern, flags=0):
|
|
if isinstance(x, str):
|
|
return re.match(pattern, x, flags)
|
|
if is_sequence(x):
|
|
return [v for v in x
|
|
if re_match(v, pattern, flags)]
|
|
if is_mapping(x):
|
|
return {k: v for k, v in x.items()
|
|
if re_match(k, pattern, flags)}
|
|
return None
|
|
|
|
|
|
def re_fullmatch(x, pattern, flags=0):
|
|
if isinstance(x, str):
|
|
return re.fullmatch(pattern, x, flags)
|
|
if is_sequence(x):
|
|
return [v for v in x
|
|
if re_fullmatch(v, pattern, flags)]
|
|
if is_mapping(x):
|
|
return {k: v for k, v in x.items()
|
|
if re_fullmatch(k, pattern, flags)}
|
|
return None
|
|
|
|
|
|
def re_match_negate(x, pattern, flags=0):
|
|
if isinstance(x, str):
|
|
return not bool(re.match(pattern, x, flags))
|
|
if is_sequence(x):
|
|
return [v for v in x
|
|
if re_match_negate(v, pattern, flags)]
|
|
if is_mapping(x):
|
|
return {k: v for k, v in x.items()
|
|
if re_match_negate(k, pattern, flags)}
|
|
return x
|
|
|
|
|
|
def re_fullmatch_negate(x, pattern, flags=0):
|
|
if isinstance(x, str):
|
|
return not bool(re.fullmatch(pattern, x, flags))
|
|
if is_sequence(x):
|
|
return [v for v in x
|
|
if re_fullmatch_negate(v, pattern, flags)]
|
|
if is_mapping(x):
|
|
return {k: v for k, v in x.items()
|
|
if re_fullmatch_negate(k, pattern, flags)}
|
|
return x
|
|
|
|
|
|
def dict_remap_keys(x: dict, key_map) -> dict:
|
|
if key_map is None:
|
|
return x
|
|
p = set(x.keys())
|
|
m = {}
|
|
for k in x:
|
|
v = key_map(k)
|
|
if v == k:
|
|
continue
|
|
m[k] = v
|
|
p.discard(k)
|
|
p.discard(v)
|
|
return {k: x[k] for k in p} | {v: x[k] for k, v in m.items()}
|
|
|
|
|
|
def re_sub(x, pattern, repl, count=0, flags=0):
|
|
if isinstance(x, str):
|
|
return re.sub(pattern, repl, x, count, flags)
|
|
if is_sequence(x):
|
|
return [
|
|
re_sub(v, pattern, repl, count, flags)
|
|
for v in x
|
|
]
|
|
if is_mapping(x):
|
|
return dict_remap_keys(
|
|
x, lambda k:
|
|
re_sub(k, pattern, repl, count, flags)
|
|
)
|
|
return x
|
|
|
|
|
|
def as_cgi_hdr(x):
|
|
if isinstance(x, str):
|
|
return 'HTTP_' + re.sub('[^A-Z0-9]+', '_', x.upper()).strip('_')
|
|
if is_sequence(x):
|
|
return uniq([
|
|
as_cgi_hdr(v)
|
|
for v in x
|
|
])
|
|
if is_mapping(x):
|
|
return dict_remap_keys(
|
|
x, as_cgi_hdr
|
|
)
|
|
return x
|
|
|
|
|
|
def as_ngx_var(x, pfx='custom'):
|
|
if isinstance(x, str):
|
|
parts = remove_empty_str(
|
|
[re.sub('[^a-z0-9]+', '_', str(i).lower()).strip('_')
|
|
for i in (pfx, x)]
|
|
)
|
|
if len(parts) < 2:
|
|
print(
|
|
f'as_ngx_var: parts={parts}',
|
|
file=sys.stderr
|
|
)
|
|
raise ValueError('as_ngx_var: incomplete string array')
|
|
return '$' + '_'.join(parts)
|
|
if is_sequence(x):
|
|
return uniq([
|
|
as_ngx_var(v, pfx)
|
|
for v in x
|
|
])
|
|
if is_mapping(x):
|
|
return dict_remap_keys(
|
|
x, lambda k:
|
|
as_ngx_var(k, pfx)
|
|
)
|
|
return x
|
|
|
|
|
|
def any_to_env_dict(x) -> dict:
|
|
if x is None:
|
|
return {}
|
|
|
|
h = {}
|
|
|
|
def feed(k, parse=False, v=None):
|
|
if v is None:
|
|
return
|
|
k = str(k)
|
|
if parse:
|
|
k2, m, v2 = k.partition('=')
|
|
if m == '=':
|
|
k = k2
|
|
v = v2
|
|
if not re.fullmatch(r'[a-zA-Z_][a-zA-Z0-9_]*', k):
|
|
return
|
|
if is_env_banned(k):
|
|
return
|
|
if k in h:
|
|
return
|
|
h[k] = v if v is None else str(v)
|
|
|
|
if isinstance(x, str):
|
|
feed(x, True)
|
|
elif is_sequence(x):
|
|
for e in x:
|
|
feed(e, True)
|
|
elif is_mapping(x):
|
|
for k in x:
|
|
feed(k, False, x[k])
|
|
else:
|
|
return {}
|
|
|
|
return h
|
|
|
|
|
|
def dict_keys(x: dict) -> list:
|
|
return sorted([k for k in x.keys()])
|
|
|
|
|
|
def dict_empty_keys(x: dict) -> list:
|
|
return sorted([k for k in x.keys() if x[k] is None])
|
|
|
|
|
|
def dict_non_empty_keys(x: dict) -> list:
|
|
return sorted([k for k in x.keys() if x[k] is not None])
|
|
|
|
|
|
def list_diff(a: list | set, b: list | set) -> list:
|
|
return list(set(a) - set(b))
|
|
|
|
|
|
def list_intersect(a: list | set, b: list | set) -> list:
|
|
return list(set(a) & set(b))
|
|
|
|
|
|
@jinja2.pass_environment
|
|
def sh_like_file_to_list(j2env, file_in: str) -> list:
|
|
tpl = j2env.get_template(file_in)
|
|
text = pathlib.Path(tpl.filename).read_text(encoding='utf-8')
|
|
lines = re.split(r'[\r\n]', text)
|
|
return list(itertools.filterfalse(
|
|
lambda x: re.match(r'\s*#', x), lines
|
|
))
|
|
|
|
|
|
def ngx_esc(x):
|
|
if isinstance(x, str):
|
|
if x == "":
|
|
return "''"
|
|
if re.search(r'(?:\s|[;{}()\[\]\\\'"*?])', x):
|
|
return repr(x)
|
|
return x
|
|
if is_sequence(x):
|
|
return uniq([
|
|
ngx_esc(v)
|
|
for v in x
|
|
])
|
|
if is_mapping(x):
|
|
return dict_remap_keys(
|
|
x, ngx_esc
|
|
)
|
|
if x is None:
|
|
return None
|
|
return ngx_esc(str(x))
|
|
|
|
|
|
def from_gobool(x) -> bool:
|
|
if isinstance(x, str):
|
|
return x.lower() in {'1', 't', 'true'}
|
|
return bool(x)
|
|
|
|
|
|
def merge_dict_recurse(d1, d2: dict) -> dict:
|
|
x = {} | d1
|
|
|
|
keys1 = set(x.keys())
|
|
keys2 = set(d2.keys())
|
|
common = keys1 & keys2
|
|
missing = keys2 - common
|
|
|
|
map1 = {k for k in common if is_mapping(x.get(k))}
|
|
seq1 = {k for k in common if is_sequence(x.get(k))}
|
|
misc1 = common - seq1 - map1
|
|
|
|
merge_safe = missing | misc1
|
|
x.update({k: d2.get(k) for k in merge_safe})
|
|
|
|
map_common = {k for k in map1 if is_mapping(d2.get(k))}
|
|
for k in map_common:
|
|
y = d2.get(k)
|
|
if not y:
|
|
x[k] = {}
|
|
continue
|
|
x[k] = merge_dict_recurse(x.get(k), y)
|
|
|
|
seq_common = {k for k in seq1 if is_sequence(d2.get(k))}
|
|
for k in seq_common:
|
|
y = d2.get(k)
|
|
if not y:
|
|
x[k] = []
|
|
continue
|
|
x[k] = uniq(list(x.get(k)) + list(y))
|
|
|
|
unmerged = (map1 - map_common) | (seq1 - seq_common)
|
|
for k in unmerged:
|
|
t1 = type(x.get(k))
|
|
t2 = type(d2.get(k))
|
|
print(
|
|
f'merge_dict_recurse(): skipping key {k}'
|
|
+ f' due to type mismatch: {t1} vs. {t2}',
|
|
file=sys.stderr)
|
|
|
|
return x
|
|
|
|
|
|
def join_prefix(prefix: str, *paths) -> str:
|
|
pfx = prefix or '/'
|
|
pfx = '/' + pfx.strip('/')
|
|
rv = os.path.normpath(os.path.join(pfx, *paths).rstrip('/')).rstrip('/')
|
|
if rv == pfx:
|
|
raise ValueError('join_prefix: empty path after prefix')
|
|
common = os.path.commonpath([pfx, rv])
|
|
if common == pfx:
|
|
return rv
|
|
# slowpath
|
|
rv = rv.removeprefix(common).lstrip('/')
|
|
rv = os.path.join(pfx, rv)
|
|
return rv
|
|
|
|
|
|
J2CFG_FILTERS = [
|
|
any_to_env_dict,
|
|
any_to_str_list,
|
|
as_cgi_hdr,
|
|
as_ngx_var,
|
|
dict_empty_keys,
|
|
dict_keys,
|
|
dict_non_empty_keys,
|
|
dict_remap_keys,
|
|
dict_to_env_str_list,
|
|
from_gobool,
|
|
is_mapping,
|
|
is_re_fullmatch,
|
|
is_re_match,
|
|
is_sequence,
|
|
join_prefix,
|
|
list_diff,
|
|
list_intersect,
|
|
ngx_esc,
|
|
re_fullmatch,
|
|
re_fullmatch_negate,
|
|
re_match,
|
|
re_match_negate,
|
|
re_sub,
|
|
remove_empty_str,
|
|
remove_non_str,
|
|
sh_like_file_to_list,
|
|
str_split_to_list,
|
|
uniq,
|
|
uniq_str_list,
|
|
]
|