2022-09-11 06:15:03 +02:00
|
|
|
from __future__ import annotations
|
|
|
|
|
2023-04-16 03:28:33 +02:00
|
|
|
import logging
|
2023-03-27 09:05:30 +02:00
|
|
|
import toml
|
|
|
|
|
2022-08-30 01:37:09 +02:00
|
|
|
from munch import Munch
|
2023-03-27 09:05:30 +02:00
|
|
|
from toml.encoder import TomlEncoder, TomlPreserveInlineDictEncoder
|
|
|
|
from typing import ClassVar, Generator, Optional, Union, Mapping, Any, get_type_hints, get_origin, get_args, Iterable
|
2023-03-21 20:53:17 +01:00
|
|
|
|
2023-04-16 20:48:48 +00:00
|
|
|
from typehelpers import UnionType, NoneType
|
2022-08-30 01:37:09 +02:00
|
|
|
|
|
|
|
|
2023-03-21 20:53:17 +01:00
|
|
|
def resolve_type_hint(hint: type, ignore_origins: list[type] = []) -> Iterable[type]:
|
2022-08-30 01:37:09 +02:00
|
|
|
origin = get_origin(hint)
|
|
|
|
args: Iterable[type] = get_args(hint)
|
2023-03-21 20:53:17 +01:00
|
|
|
if origin in ignore_origins:
|
|
|
|
return [hint]
|
2022-08-30 01:37:09 +02:00
|
|
|
if origin is Optional:
|
2023-03-21 20:53:17 +01:00
|
|
|
args = set(list(args) + [NoneType])
|
2022-12-07 13:21:52 +01:00
|
|
|
if origin in [Union, UnionType, Optional]:
|
2022-08-30 03:12:39 +02:00
|
|
|
results: list[type] = []
|
2022-08-30 01:37:09 +02:00
|
|
|
for arg in args:
|
2023-03-21 20:53:17 +01:00
|
|
|
results += resolve_type_hint(arg, ignore_origins=ignore_origins)
|
2022-08-30 01:37:09 +02:00
|
|
|
return results
|
|
|
|
return [origin or hint]
|
|
|
|
|
|
|
|
|
2023-03-27 09:05:30 +02:00
|
|
|
def flatten_hints(hints: Any) -> Generator[Any, None, None]:
|
|
|
|
if not isinstance(hints, (list, tuple)):
|
|
|
|
yield hints
|
|
|
|
return
|
|
|
|
for i in hints:
|
|
|
|
yield from flatten_hints(i)
|
|
|
|
|
|
|
|
|
|
|
|
def resolve_dict_hints(hints: Any) -> Generator[tuple[Any, ...], None, None]:
|
|
|
|
for hint in flatten_hints(hints):
|
|
|
|
t_origin = get_origin(hint)
|
|
|
|
t_args = get_args(hint)
|
|
|
|
if t_origin == dict:
|
|
|
|
yield t_args
|
|
|
|
continue
|
|
|
|
if t_origin in [NoneType, Optional, Union, UnionType] and t_args:
|
|
|
|
yield from resolve_dict_hints(t_args)
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
2023-04-17 02:32:28 +02:00
|
|
|
class DictScheme(Munch):
|
2022-08-30 01:37:09 +02:00
|
|
|
|
2022-09-27 06:28:53 +02:00
|
|
|
_type_hints: ClassVar[dict[str, Any]]
|
2023-03-21 20:53:17 +01:00
|
|
|
_strip_hidden: ClassVar[bool] = False
|
|
|
|
_sparse: ClassVar[bool] = False
|
2022-09-27 06:28:53 +02:00
|
|
|
|
2023-03-27 09:05:30 +02:00
|
|
|
def __init__(self, d: Mapping = {}, validate: bool = True, **kwargs):
|
2022-08-30 01:37:09 +02:00
|
|
|
self.update(d | kwargs, validate=validate)
|
|
|
|
|
|
|
|
@classmethod
|
2023-03-27 19:45:35 +02:00
|
|
|
def transform(
|
|
|
|
cls,
|
|
|
|
values: Mapping[str, Any],
|
|
|
|
validate: bool = True,
|
|
|
|
allow_extra: bool = False,
|
|
|
|
type_hints: Optional[dict[str, Any]] = None,
|
|
|
|
) -> Any:
|
2023-04-16 03:28:33 +02:00
|
|
|
results: dict[str, Any] = {}
|
2022-08-30 01:37:09 +02:00
|
|
|
values = dict(values)
|
|
|
|
for key in list(values.keys()):
|
|
|
|
value = values.pop(key)
|
2023-03-27 09:05:30 +02:00
|
|
|
type_hints = cls._type_hints if type_hints is None else type_hints
|
2022-08-30 01:37:09 +02:00
|
|
|
if key in type_hints:
|
2022-08-30 03:12:39 +02:00
|
|
|
_classes = tuple[type](resolve_type_hint(type_hints[key]))
|
2023-04-16 20:48:48 +00:00
|
|
|
optional = bool(set([NoneType, None]).intersection(_classes))
|
2023-04-16 03:28:33 +02:00
|
|
|
if optional and value is None:
|
|
|
|
results[key] = None
|
|
|
|
continue
|
2022-08-30 01:37:09 +02:00
|
|
|
if issubclass(_classes[0], dict):
|
2023-04-16 20:48:48 +00:00
|
|
|
assert isinstance(value, dict) or (optional and value is None), f'{key=} is not dict: {value!r}, {_classes=}'
|
2022-08-30 01:37:09 +02:00
|
|
|
target_class = _classes[0]
|
2023-03-27 09:05:30 +02:00
|
|
|
if target_class in [None, NoneType, Optional]:
|
|
|
|
for target in _classes[1:]:
|
|
|
|
if target not in [None, NoneType, Optional]:
|
|
|
|
target_class = target
|
|
|
|
break
|
2022-08-30 03:12:39 +02:00
|
|
|
if target_class is dict:
|
2023-03-27 09:05:30 +02:00
|
|
|
dict_hints = list(resolve_dict_hints(type_hints[key]))
|
|
|
|
if len(dict_hints) != 1:
|
2023-04-16 03:28:33 +02:00
|
|
|
msg = f"transform(): Received wrong amount of type hints for key {key}: {len(dict_hints)}"
|
|
|
|
if validate:
|
|
|
|
raise Exception(msg)
|
|
|
|
logging.warning(msg)
|
2023-03-27 09:05:30 +02:00
|
|
|
if len(dict_hints) == 1 and value is not None:
|
|
|
|
if len(dict_hints[0]) != 2 or not all(dict_hints[0]):
|
2023-04-16 03:28:33 +02:00
|
|
|
logging.debug(f"Weird dict hints received: {dict_hints}")
|
2023-03-27 09:05:30 +02:00
|
|
|
continue
|
|
|
|
key_type, value_type = dict_hints[0]
|
|
|
|
if not isinstance(value, Mapping):
|
2023-04-16 03:28:33 +02:00
|
|
|
msg = f"Got non-mapping {value!r} for expected dict type: {key_type} => {value_type}. Allowed classes: {_classes}"
|
2023-03-27 09:05:30 +02:00
|
|
|
if validate:
|
2023-04-16 03:28:33 +02:00
|
|
|
raise Exception(msg)
|
|
|
|
logging.warning(msg)
|
2023-03-27 09:05:30 +02:00
|
|
|
results[key] = value
|
|
|
|
continue
|
|
|
|
if isinstance(key_type, type):
|
|
|
|
if issubclass(key_type, str):
|
|
|
|
target_class = Munch
|
|
|
|
else:
|
2023-04-16 03:28:33 +02:00
|
|
|
msg = f"{key=} subdict got wrong key type hint (expected str): {key_type}"
|
|
|
|
if validate:
|
|
|
|
raise Exception(msg)
|
|
|
|
logging.warning(msg)
|
2023-03-27 09:05:30 +02:00
|
|
|
if validate:
|
|
|
|
for k in value:
|
|
|
|
if not isinstance(k, tuple(flatten_hints(key_type))):
|
|
|
|
raise Exception(f'Subdict "{key}": wrong type for subkey "{k}": got: {type(k)}, expected: {key_type}')
|
|
|
|
dict_content_hints = {k: value_type for k in value}
|
|
|
|
value = cls.transform(value, validate=validate, allow_extra=allow_extra, type_hints=dict_content_hints)
|
2022-08-30 01:37:09 +02:00
|
|
|
if not isinstance(value, target_class):
|
2022-09-08 19:27:34 +02:00
|
|
|
if not (optional and value is None):
|
|
|
|
assert issubclass(target_class, Munch)
|
|
|
|
# despite the above assert, mypy doesn't seem to understand target_class is a Munch here
|
2023-04-17 02:32:28 +02:00
|
|
|
kwargs = {'validate': validate} if issubclass(target_class, DictScheme) else {}
|
2023-03-27 09:05:30 +02:00
|
|
|
value = target_class(value, **kwargs) # type:ignore[attr-defined]
|
|
|
|
else:
|
2023-04-16 03:28:33 +02:00
|
|
|
# print(f"nothing to do: '{key}' was already {target_class})
|
|
|
|
pass
|
2022-08-30 03:12:39 +02:00
|
|
|
# handle numerics
|
|
|
|
elif set(_classes).intersection([int, float]) and isinstance(value, str) and str not in _classes:
|
|
|
|
parsed_number = None
|
|
|
|
parsers: list[tuple[type, list]] = [(int, [10]), (int, [0]), (float, [])]
|
|
|
|
for _cls, args in parsers:
|
|
|
|
if _cls not in _classes:
|
|
|
|
continue
|
|
|
|
try:
|
|
|
|
parsed_number = _cls(value, *args)
|
|
|
|
break
|
|
|
|
except ValueError:
|
|
|
|
continue
|
|
|
|
if parsed_number is None:
|
|
|
|
if validate:
|
|
|
|
raise Exception(f"Couldn't parse string value {repr(value)} for key '{key}' into number formats: " +
|
|
|
|
(', '.join(list(c.__name__ for c in _classes))))
|
|
|
|
else:
|
|
|
|
value = parsed_number
|
2022-08-30 01:37:09 +02:00
|
|
|
if validate:
|
|
|
|
if not isinstance(value, _classes):
|
2022-11-09 16:29:47 +01:00
|
|
|
raise Exception(f'key "{key}" has value of wrong type! expected: '
|
|
|
|
f'{" ,".join([ c.__name__ for c in _classes])}; '
|
|
|
|
f'got: {type(value).__name__}; value: {value}')
|
2022-08-30 01:37:09 +02:00
|
|
|
elif validate and not allow_extra:
|
2023-03-27 09:05:30 +02:00
|
|
|
logging.debug(f"{cls}: unknown key '{key}': {value}")
|
|
|
|
raise Exception(f'{cls}: Unknown key "{key}"')
|
2022-08-30 01:37:09 +02:00
|
|
|
else:
|
|
|
|
if isinstance(value, dict) and not isinstance(value, Munch):
|
|
|
|
value = Munch.fromDict(value)
|
|
|
|
results[key] = value
|
|
|
|
if values:
|
|
|
|
if validate:
|
|
|
|
raise Exception(f'values contained unknown keys: {list(values.keys())}')
|
|
|
|
results |= values
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def fromDict(cls, values: Mapping[str, Any], validate: bool = True):
|
2023-03-21 20:53:17 +01:00
|
|
|
return cls(d=values, validate=validate)
|
|
|
|
|
|
|
|
def toDict(
|
|
|
|
self,
|
|
|
|
strip_hidden: Optional[bool] = None,
|
|
|
|
sparse: Optional[bool] = None,
|
|
|
|
):
|
2023-03-27 09:05:30 +02:00
|
|
|
return self.strip_dict(
|
2023-03-21 20:53:17 +01:00
|
|
|
self,
|
2023-03-27 09:05:30 +02:00
|
|
|
strip_hidden=strip_hidden,
|
|
|
|
sparse=sparse,
|
2023-03-21 20:53:17 +01:00
|
|
|
recursive=True,
|
|
|
|
)
|
2022-08-30 01:37:09 +02:00
|
|
|
|
2023-03-27 09:05:30 +02:00
|
|
|
@classmethod
|
|
|
|
def strip_dict(
|
|
|
|
cls,
|
|
|
|
d: dict[Any, Any],
|
|
|
|
strip_hidden: Optional[bool] = None,
|
|
|
|
sparse: Optional[bool] = None,
|
|
|
|
recursive: bool = True,
|
|
|
|
hints: Optional[dict[str, Any]] = None,
|
2023-04-16 03:28:33 +02:00
|
|
|
validate: bool = True,
|
2023-03-27 09:05:30 +02:00
|
|
|
) -> dict[Any, Any]:
|
|
|
|
# preserve original None-type args
|
|
|
|
_sparse = cls._sparse if sparse is None else sparse
|
|
|
|
_strip_hidden = cls._strip_hidden if strip_hidden is None else strip_hidden
|
|
|
|
hints = cls._type_hints if hints is None else hints
|
|
|
|
result = dict(d)
|
|
|
|
if not (_strip_hidden or _sparse or result):
|
|
|
|
return result
|
|
|
|
for k, v in d.items():
|
|
|
|
type_hint = resolve_type_hint(hints.get(k, "abc"))
|
|
|
|
if not isinstance(k, str):
|
2023-04-16 03:28:33 +02:00
|
|
|
msg = f"strip_dict(): unknown key type {k=}: {type(k)=}"
|
|
|
|
if validate:
|
|
|
|
raise Exception(msg)
|
|
|
|
logging.warning(f"{msg} (skipping)")
|
2023-03-27 09:05:30 +02:00
|
|
|
continue
|
2023-04-16 03:28:33 +02:00
|
|
|
if _strip_hidden and k.startswith('_'):
|
2023-03-27 09:05:30 +02:00
|
|
|
result.pop(k)
|
|
|
|
continue
|
|
|
|
if v is None:
|
|
|
|
if NoneType not in type_hint:
|
|
|
|
msg = f'encountered illegal null value at key "{k}" for typehint {type_hint}'
|
2023-04-16 03:28:33 +02:00
|
|
|
if validate:
|
2023-03-27 09:05:30 +02:00
|
|
|
raise Exception(msg)
|
2023-04-16 03:28:33 +02:00
|
|
|
logging.warning(msg)
|
2023-03-27 09:05:30 +02:00
|
|
|
if _sparse:
|
|
|
|
result.pop(k)
|
|
|
|
continue
|
|
|
|
if recursive and isinstance(v, dict):
|
|
|
|
if not v:
|
|
|
|
result[k] = {}
|
|
|
|
continue
|
2023-04-17 02:32:28 +02:00
|
|
|
if isinstance(v, DictScheme):
|
2023-04-16 03:28:33 +02:00
|
|
|
# pass None in sparse and strip_hidden
|
|
|
|
result[k] = v.toDict(strip_hidden=strip_hidden, sparse=sparse)
|
2023-03-27 09:05:30 +02:00
|
|
|
continue
|
|
|
|
if isinstance(v, Munch):
|
|
|
|
result[k] = v.toDict()
|
|
|
|
if k not in hints:
|
|
|
|
continue
|
|
|
|
_subhints = {}
|
|
|
|
_hints = resolve_type_hint(hints[k], [dict])
|
|
|
|
hints_flat = list(flatten_hints(_hints))
|
2023-04-17 02:32:28 +02:00
|
|
|
subclass = DictScheme
|
2023-03-27 09:05:30 +02:00
|
|
|
for hint in hints_flat:
|
|
|
|
if get_origin(hint) == dict:
|
|
|
|
_valtype = get_args(hint)[1]
|
|
|
|
_subhints = {n: _valtype for n in v.keys()}
|
|
|
|
break
|
2023-04-17 02:32:28 +02:00
|
|
|
if isinstance(hint, type) and issubclass(hint, DictScheme):
|
2023-03-27 09:05:30 +02:00
|
|
|
subclass = hint
|
|
|
|
_subhints = hint._type_hints
|
|
|
|
break
|
|
|
|
else:
|
2023-04-16 03:28:33 +02:00
|
|
|
# print(f"ignoring {hint=}")
|
|
|
|
continue
|
2023-03-27 09:05:30 +02:00
|
|
|
result[k] = subclass.strip_dict(
|
|
|
|
v,
|
|
|
|
hints=_subhints,
|
|
|
|
sparse=_sparse,
|
|
|
|
strip_hidden=_strip_hidden,
|
|
|
|
recursive=recursive,
|
|
|
|
)
|
|
|
|
return result
|
|
|
|
|
2022-08-30 01:37:09 +02:00
|
|
|
def update(self, d: Mapping[str, Any], validate: bool = True):
|
|
|
|
Munch.update(self, type(self).transform(d, validate))
|
|
|
|
|
|
|
|
def __init_subclass__(cls):
|
|
|
|
super().__init_subclass__()
|
2022-09-27 06:28:53 +02:00
|
|
|
cls._type_hints = {name: hint for name, hint in get_type_hints(cls).items() if get_origin(hint) is not ClassVar}
|
2022-08-30 01:37:09 +02:00
|
|
|
|
|
|
|
def __repr__(self):
|
2023-03-27 09:05:30 +02:00
|
|
|
return f'{type(self)}{dict.__repr__(dict(self))}'
|
2023-03-21 20:53:17 +01:00
|
|
|
|
2023-03-27 09:05:30 +02:00
|
|
|
def toYAML(
|
|
|
|
self,
|
|
|
|
strip_hidden: Optional[bool] = None,
|
|
|
|
sparse: Optional[bool] = None,
|
2023-03-27 19:45:35 +02:00
|
|
|
**yaml_args,
|
2023-03-27 09:05:30 +02:00
|
|
|
) -> str:
|
2023-03-21 20:53:17 +01:00
|
|
|
import yaml
|
2023-03-27 09:05:30 +02:00
|
|
|
yaml_args = {'sort_keys': False} | yaml_args
|
2023-03-21 20:53:17 +01:00
|
|
|
return yaml.dump(
|
|
|
|
self.toDict(strip_hidden=strip_hidden, sparse=sparse),
|
|
|
|
**yaml_args,
|
|
|
|
)
|
|
|
|
|
2023-03-27 09:05:30 +02:00
|
|
|
def toToml(
|
2023-03-27 19:45:35 +02:00
|
|
|
self,
|
|
|
|
strip_hidden: Optional[bool] = None,
|
|
|
|
sparse: Optional[bool] = None,
|
|
|
|
encoder: Optional[TomlEncoder] = TomlPreserveInlineDictEncoder(),
|
2023-03-27 09:05:30 +02:00
|
|
|
) -> str:
|
2023-03-21 20:53:17 +01:00
|
|
|
return toml.dumps(
|
|
|
|
self.toDict(strip_hidden=strip_hidden, sparse=sparse),
|
2023-03-27 09:05:30 +02:00
|
|
|
encoder=encoder,
|
2023-03-21 20:53:17 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
|
2023-03-27 09:05:30 +02:00
|
|
|
class TomlInlineDict(dict, toml.decoder.InlineTableDict):
|
|
|
|
pass
|
2023-03-21 20:53:17 +01:00
|
|
|
|
|
|
|
|
2023-03-27 09:05:30 +02:00
|
|
|
def toml_inline_dicts(value: Any) -> Any:
|
|
|
|
if not isinstance(value, Mapping):
|
|
|
|
return value
|
|
|
|
return TomlInlineDict({k: toml_inline_dicts(v) for k, v in value.items()})
|